diff --git a/rust/Cargo.lock b/rust/Cargo.lock index cf90be5819..8d8223ce97 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -2775,6 +2775,7 @@ dependencies = [ "derive_builder", "ethers", "futures", + "hyperlane-core", "log", "maplit", "parking_lot 0.12.1", @@ -4035,6 +4036,7 @@ dependencies = [ "config", "convert_case 0.6.0", "derive-new", + "derive_builder", "ed25519-dalek", "ethers", "ethers-prometheus", @@ -4048,6 +4050,7 @@ dependencies = [ "hyperlane-sealevel", "hyperlane-test", "itertools 0.11.0", + "maplit", "paste", "prometheus", "rocksdb", diff --git a/rust/agents/relayer/src/relayer.rs b/rust/agents/relayer/src/relayer.rs index 5ff91466a2..ba4ac30cdb 100644 --- a/rust/agents/relayer/src/relayer.rs +++ b/rust/agents/relayer/src/relayer.rs @@ -9,11 +9,15 @@ use derive_more::AsRef; use eyre::Result; use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, - run_all, BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, - SequencedDataContractSync, WatermarkContractSync, + metrics::{AgentMetrics, AgentMetricsUpdater}, + run_all, + settings::ChainConf, + BaseAgent, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync, + WatermarkContractSync, }; use hyperlane_core::{ - HyperlaneDomain, HyperlaneMessage, InterchainGasPayment, MerkleTreeInsertion, U256, + metrics::agent::METRICS_SCRAPE_INTERVAL, HyperlaneDomain, HyperlaneMessage, + InterchainGasPayment, MerkleTreeInsertion, U256, }; use tokio::{ sync::{ @@ -49,7 +53,7 @@ struct ContextKey { #[derive(AsRef)] pub struct Relayer { origin_chains: HashSet, - destination_chains: HashSet, + destination_chains: HashMap, #[as_ref] core: HyperlaneAgentCore, message_syncs: HashMap>>, @@ -67,6 +71,8 @@ pub struct Relayer { transaction_gas_limit: Option, skip_transaction_gas_limit_for: HashSet, allow_local_checkpoint_syncers: bool, + core_metrics: Arc, + agent_metrics: AgentMetrics, } impl Debug for Relayer { @@ -92,11 +98,15 @@ impl BaseAgent for Relayer { type Settings = RelayerSettings; - async fn from_settings(settings: Self::Settings, metrics: Arc) -> Result + async fn from_settings( + settings: Self::Settings, + core_metrics: Arc, + agent_metrics: AgentMetrics, + ) -> Result where Self: Sized, { - let core = settings.build_hyperlane_core(metrics.clone()); + let core = settings.build_hyperlane_core(core_metrics.clone()); let db = DB::from_path(&settings.db)?; let dbs = settings .origin_chains @@ -105,18 +115,18 @@ impl BaseAgent for Relayer { .collect::>(); let mailboxes = settings - .build_mailboxes(settings.destination_chains.iter(), &metrics) + .build_mailboxes(settings.destination_chains.iter(), &core_metrics) .await?; let validator_announces = settings - .build_validator_announces(settings.origin_chains.iter(), &metrics) + .build_validator_announces(settings.origin_chains.iter(), &core_metrics) .await?; - let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&metrics)); + let contract_sync_metrics = Arc::new(ContractSyncMetrics::new(&core_metrics)); let message_syncs = settings .build_message_indexers( settings.origin_chains.iter(), - &metrics, + &core_metrics, &contract_sync_metrics, dbs.iter() .map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _)) @@ -126,7 +136,7 @@ impl BaseAgent for Relayer { let interchain_gas_payment_syncs = settings .build_interchain_gas_payment_indexers( settings.origin_chains.iter(), - &metrics, + &core_metrics, &contract_sync_metrics, dbs.iter() .map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _)) @@ -136,7 +146,7 @@ impl BaseAgent for Relayer { let merkle_tree_hook_syncs = settings .build_merkle_tree_hook_indexers( settings.origin_chains.iter(), - &metrics, + &core_metrics, &contract_sync_metrics, dbs.iter() .map(|(d, db)| (d.clone(), Arc::new(db.clone()) as _)) @@ -188,9 +198,10 @@ impl BaseAgent for Relayer { .collect(); let mut msg_ctxs = HashMap::new(); + let mut destination_chains = HashMap::new(); for destination in &settings.destination_chains { let destination_chain_setup = core.settings.chain_setup(destination).unwrap().clone(); - + destination_chains.insert(destination.clone(), destination_chain_setup.clone()); let transaction_gas_limit: Option = if skip_transaction_gas_limit_for.contains(&destination.id()) { None @@ -221,7 +232,7 @@ impl BaseAgent for Relayer { metadata_builder, origin_gas_payment_enforcer: gas_payment_enforcers[origin].clone(), transaction_gas_limit, - metrics: MessageSubmissionMetrics::new(&metrics, origin, destination), + metrics: MessageSubmissionMetrics::new(&core_metrics, origin, destination), }), ); } @@ -230,7 +241,7 @@ impl BaseAgent for Relayer { Ok(Self { dbs, origin_chains: settings.origin_chains, - destination_chains: settings.destination_chains, + destination_chains, msg_ctxs, core, message_syncs, @@ -242,6 +253,8 @@ impl BaseAgent for Relayer { transaction_gas_limit, skip_transaction_gas_limit_for, allow_local_checkpoint_syncers: settings.allow_local_checkpoint_syncers, + core_metrics, + agent_metrics, }) } @@ -251,12 +264,32 @@ impl BaseAgent for Relayer { // send channels by destination chain let mut send_channels = HashMap::with_capacity(self.destination_chains.len()); - for destination in &self.destination_chains { + for (dest_domain, dest_conf) in &self.destination_chains { let (send_channel, receive_channel) = mpsc::unbounded_channel::>(); - send_channels.insert(destination.id(), send_channel); + send_channels.insert(dest_domain.id(), send_channel); + + tasks.push(self.run_destination_submitter(dest_domain, receive_channel)); - tasks.push(self.run_destination_submitter(destination, receive_channel)); + let agent_metrics_conf = dest_conf + .agent_metrics_conf(Self::AGENT_NAME.to_string()) + .await + .unwrap(); + let agent_metrics_fetcher = dest_conf.build_provider(&self.core_metrics).await.unwrap(); + let agent_metrics = AgentMetricsUpdater::new( + self.agent_metrics.clone(), + agent_metrics_conf, + agent_metrics_fetcher, + ); + + let fetcher_task = tokio::spawn(async move { + agent_metrics + .start_updating_on_interval(METRICS_SCRAPE_INTERVAL) + .await; + Ok(()) + }) + .instrument(info_span!("AgentMetrics")); + tasks.push(fetcher_task); } for origin in &self.origin_chains { @@ -330,11 +363,11 @@ impl Relayer { let metrics = MessageProcessorMetrics::new( &self.core.metrics, origin, - self.destination_chains.iter(), + self.destination_chains.keys(), ); let destination_ctxs = self .destination_chains - .iter() + .keys() .filter(|&destination| destination != origin) .map(|destination| { ( diff --git a/rust/agents/scraper/migration/bin/common.rs b/rust/agents/scraper/migration/bin/common.rs index 096f7628c1..df7173fc0e 100644 --- a/rust/agents/scraper/migration/bin/common.rs +++ b/rust/agents/scraper/migration/bin/common.rs @@ -1,9 +1,11 @@ -use std::env; +use std::{env, time::Duration}; use migration::sea_orm::{Database, DatabaseConnection}; pub use migration::{DbErr, Migrator, MigratorTrait as _}; +use sea_orm::ConnectOptions; const LOCAL_DATABASE_URL: &str = "postgresql://postgres:47221c18c610@localhost:5432/postgres"; +const CONNECT_TIMEOUT: u64 = 20; pub fn url() -> String { env::var("DATABASE_URL").unwrap_or_else(|_| LOCAL_DATABASE_URL.into()) @@ -16,6 +18,8 @@ pub async fn init() -> Result { .init(); let url = url(); + let mut options: ConnectOptions = url.clone().into(); + options.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT)); println!("Connecting to {url}"); - Database::connect(url).await + Database::connect(options).await } diff --git a/rust/agents/scraper/src/agent.rs b/rust/agents/scraper/src/agent.rs index b582f8e2ef..9941c6a806 100644 --- a/rust/agents/scraper/src/agent.rs +++ b/rust/agents/scraper/src/agent.rs @@ -3,8 +3,8 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use derive_more::AsRef; use hyperlane_base::{ - run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, CoreMetrics, - HyperlaneAgentCore, + metrics::AgentMetrics, run_all, settings::IndexSettings, BaseAgent, ContractSyncMetrics, + CoreMetrics, HyperlaneAgentCore, }; use hyperlane_core::HyperlaneDomain; use tokio::task::JoinHandle; @@ -38,6 +38,7 @@ impl BaseAgent for Scraper { async fn from_settings( settings: Self::Settings, metrics: Arc, + _agent_metrics: AgentMetrics, ) -> eyre::Result where Self: Sized, diff --git a/rust/agents/validator/src/validator.rs b/rust/agents/validator/src/validator.rs index 96feb97b8f..42474030ed 100644 --- a/rust/agents/validator/src/validator.rs +++ b/rust/agents/validator/src/validator.rs @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument::Instrumented, warn, Instrument use hyperlane_base::{ db::{HyperlaneRocksDB, DB}, + metrics::AgentMetrics, run_all, BaseAgent, CheckpointSyncer, ContractSyncMetrics, CoreMetrics, HyperlaneAgentCore, SequencedDataContractSync, }; @@ -51,7 +52,11 @@ impl BaseAgent for Validator { type Settings = ValidatorSettings; - async fn from_settings(settings: Self::Settings, metrics: Arc) -> Result + async fn from_settings( + settings: Self::Settings, + metrics: Arc, + _agent_metrics: AgentMetrics, + ) -> Result where Self: Sized, { diff --git a/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs b/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs index c9d7200117..a17a4ba3af 100644 --- a/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/aggregation_ism.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use crate::{ address::CosmosAddress, - grpc::{WasmGrpcProvider, WasmProvider}, + grpc::WasmProvider, payloads::aggregate_ism::{ModulesAndThresholdRequest, ModulesAndThresholdResponse}, ConnectionConf, CosmosProvider, Signer, }; @@ -18,7 +18,7 @@ use tracing::instrument; pub struct CosmosAggregationIsm { domain: HyperlaneDomain, address: H256, - provider: Box, + provider: Box, } impl CosmosAggregationIsm { @@ -28,7 +28,12 @@ impl CosmosAggregationIsm { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), @@ -50,7 +55,7 @@ impl HyperlaneChain for CosmosAggregationIsm { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + self.provider.clone() } } @@ -63,7 +68,7 @@ impl AggregationIsm for CosmosAggregationIsm { ) -> ChainResult<(Vec, u8)> { let payload = ModulesAndThresholdRequest::new(message); - let data = self.provider.wasm_query(payload, None).await?; + let data = self.provider.grpc().wasm_query(payload, None).await?; let response: ModulesAndThresholdResponse = serde_json::from_slice(&data)?; let modules: ChainResult> = response diff --git a/rust/chains/hyperlane-cosmos/src/interchain_gas.rs b/rust/chains/hyperlane-cosmos/src/interchain_gas.rs index d96bfb0bab..491274c846 100644 --- a/rust/chains/hyperlane-cosmos/src/interchain_gas.rs +++ b/rust/chains/hyperlane-cosmos/src/interchain_gas.rs @@ -10,7 +10,6 @@ use once_cell::sync::Lazy; use std::ops::RangeInclusive; use crate::{ - grpc::WasmGrpcProvider, rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}, signers::Signer, utils::{CONTRACT_ADDRESS_ATTRIBUTE_KEY, CONTRACT_ADDRESS_ATTRIBUTE_KEY_BASE64}, @@ -22,6 +21,7 @@ use crate::{ pub struct CosmosInterchainGasPaymaster { domain: HyperlaneDomain, address: H256, + provider: CosmosProvider, } impl HyperlaneContract for CosmosInterchainGasPaymaster { @@ -36,7 +36,7 @@ impl HyperlaneChain for CosmosInterchainGasPaymaster { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -49,11 +49,17 @@ impl CosmosInterchainGasPaymaster { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), address: locator.address, + provider, }) } } diff --git a/rust/chains/hyperlane-cosmos/src/interchain_security_module.rs b/rust/chains/hyperlane-cosmos/src/interchain_security_module.rs index 72a0ac984d..9e726e5628 100644 --- a/rust/chains/hyperlane-cosmos/src/interchain_security_module.rs +++ b/rust/chains/hyperlane-cosmos/src/interchain_security_module.rs @@ -5,7 +5,7 @@ use hyperlane_core::{ }; use crate::{ - grpc::{WasmGrpcProvider, WasmProvider}, + grpc::WasmProvider, payloads::{ general::EmptyStruct, ism_routes::{QueryIsmGeneralRequest, QueryIsmModuleTypeRequest}, @@ -22,7 +22,7 @@ pub struct CosmosInterchainSecurityModule { /// The address of the ISM contract. address: H256, /// The provider for the ISM contract. - provider: Box, + provider: CosmosProvider, } /// The Cosmos Interchain Security Module Implementation. @@ -33,13 +33,17 @@ impl CosmosInterchainSecurityModule { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider: WasmGrpcProvider = - WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), address: locator.address, - provider: Box::new(provider), + provider, }) } } @@ -56,7 +60,7 @@ impl HyperlaneChain for CosmosInterchainSecurityModule { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -71,6 +75,7 @@ impl InterchainSecurityModule for CosmosInterchainSecurityModule { let data = self .provider + .grpc() .wasm_query(QueryIsmGeneralRequest { ism: query }, None) .await?; diff --git a/rust/chains/hyperlane-cosmos/src/libs/address.rs b/rust/chains/hyperlane-cosmos/src/libs/address.rs index d5970b9b82..507bd24172 100644 --- a/rust/chains/hyperlane-cosmos/src/libs/address.rs +++ b/rust/chains/hyperlane-cosmos/src/libs/address.rs @@ -12,7 +12,7 @@ use tendermint::public_key::PublicKey as TendermintPublicKey; use crate::HyperlaneCosmosError; /// Wrapper around the cosmrs AccountId type that abstracts bech32 encoding -#[derive(new, Debug)] +#[derive(new, Debug, Clone)] pub struct CosmosAddress { /// Bech32 encoded cosmos account account_id: AccountId, @@ -132,6 +132,11 @@ pub mod test { addr.address(), "neutron1kknekjxg0ear00dky5ykzs8wwp2gz62z9s6aaj" ); + // TODO: watch out for this edge case. This check will fail unless + // the first 12 bytes are removed from the digest. + // let digest = addr.digest(); + // let addr2 = CosmosAddress::from_h256(digest, prefix).expect("Cosmos address creation failed"); + // assert_eq!(addr.address(), addr2.address()); } #[test] diff --git a/rust/chains/hyperlane-cosmos/src/mailbox.rs b/rust/chains/hyperlane-cosmos/src/mailbox.rs index 4df968edc9..4aafd29c87 100644 --- a/rust/chains/hyperlane-cosmos/src/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/mailbox.rs @@ -14,10 +14,7 @@ use crate::payloads::{general, mailbox}; use crate::rpc::{CosmosWasmIndexer, ParsedEvent, WasmIndexer}; use crate::CosmosProvider; use crate::{address::CosmosAddress, types::tx_response_to_outcome}; -use crate::{ - grpc::{WasmGrpcProvider, WasmProvider}, - HyperlaneCosmosError, -}; +use crate::{grpc::WasmProvider, HyperlaneCosmosError}; use crate::{signers::Signer, utils::get_block_height_for_lag, ConnectionConf}; use async_trait::async_trait; use cosmrs::proto::cosmos::base::abci::v1beta1::TxResponse; @@ -40,7 +37,7 @@ pub struct CosmosMailbox { config: ConnectionConf, domain: HyperlaneDomain, address: H256, - provider: Box, + provider: CosmosProvider, } impl CosmosMailbox { @@ -51,13 +48,18 @@ impl CosmosMailbox { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { config: conf, domain: locator.domain.clone(), address: locator.address, - provider: Box::new(provider), + provider, }) } @@ -79,7 +81,7 @@ impl HyperlaneChain for CosmosMailbox { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -94,7 +96,7 @@ impl Debug for CosmosMailbox { impl Mailbox for CosmosMailbox { #[instrument(level = "debug", err, ret, skip(self))] async fn count(&self, lag: Option) -> ChainResult { - let block_height = get_block_height_for_lag(&self.provider, lag).await?; + let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?; self.nonce_at_block(block_height).await } @@ -107,6 +109,7 @@ impl Mailbox for CosmosMailbox { let delivered = match self .provider + .grpc() .wasm_query(GeneralMailboxQuery { mailbox: payload }, None) .await { @@ -136,6 +139,7 @@ impl Mailbox for CosmosMailbox { let data = self .provider + .grpc() .wasm_query(GeneralMailboxQuery { mailbox: payload }, None) .await?; let response: mailbox::DefaultIsmResponse = serde_json::from_slice(&data)?; @@ -157,6 +161,7 @@ impl Mailbox for CosmosMailbox { let data = self .provider + .grpc() .wasm_query(GeneralMailboxQuery { mailbox: payload }, None) .await?; let response: mailbox::RecipientIsmResponse = serde_json::from_slice(&data)?; @@ -182,6 +187,7 @@ impl Mailbox for CosmosMailbox { let response: TxResponse = self .provider + .grpc() .wasm_send(process_message, tx_gas_limit) .await?; @@ -201,7 +207,11 @@ impl Mailbox for CosmosMailbox { }, }; - let gas_limit = self.provider.wasm_estimate_gas(process_message).await?; + let gas_limit = self + .provider + .grpc() + .wasm_estimate_gas(process_message) + .await?; let result = TxCostEstimate { gas_limit: gas_limit.into(), @@ -226,6 +236,7 @@ impl CosmosMailbox { let data = self .provider + .grpc() .wasm_query(GeneralMailboxQuery { mailbox: payload }, block_height) .await?; diff --git a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs index 15db14ff7a..3de0821dff 100644 --- a/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-cosmos/src/merkle_tree_hook.rs @@ -12,7 +12,7 @@ use once_cell::sync::Lazy; use tracing::instrument; use crate::{ - grpc::{WasmGrpcProvider, WasmProvider}, + grpc::WasmProvider, payloads::{ general::{self}, merkle_tree_hook, @@ -33,7 +33,7 @@ pub struct CosmosMerkleTreeHook { /// Contract address address: H256, /// Provider - provider: Box, + provider: CosmosProvider, } impl CosmosMerkleTreeHook { @@ -43,12 +43,17 @@ impl CosmosMerkleTreeHook { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), address: locator.address, - provider: Box::new(provider), + provider, }) } } @@ -65,7 +70,7 @@ impl HyperlaneChain for CosmosMerkleTreeHook { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -78,10 +83,11 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { tree: general::EmptyStruct {}, }; - let block_height = get_block_height_for_lag(&self.provider, lag).await?; + let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?; let data = self .provider + .grpc() .wasm_query( merkle_tree_hook::MerkleTreeGenericRequest { merkle_hook: payload, @@ -111,7 +117,7 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { count: general::EmptyStruct {}, }; - let block_height = get_block_height_for_lag(&self.provider, lag).await?; + let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?; self.count_at_block(block_height).await } @@ -122,10 +128,11 @@ impl MerkleTreeHook for CosmosMerkleTreeHook { check_point: general::EmptyStruct {}, }; - let block_height = get_block_height_for_lag(&self.provider, lag).await?; + let block_height = get_block_height_for_lag(self.provider.grpc(), lag).await?; let data = self .provider + .grpc() .wasm_query( merkle_tree_hook::MerkleTreeGenericRequest { merkle_hook: payload, @@ -153,6 +160,7 @@ impl CosmosMerkleTreeHook { let data = self .provider + .grpc() .wasm_query( merkle_tree_hook::MerkleTreeGenericRequest { merkle_hook: payload, diff --git a/rust/chains/hyperlane-cosmos/src/multisig_ism.rs b/rust/chains/hyperlane-cosmos/src/multisig_ism.rs index a9d84dec7f..d558acfa37 100644 --- a/rust/chains/hyperlane-cosmos/src/multisig_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/multisig_ism.rs @@ -1,9 +1,7 @@ use std::str::FromStr; use crate::{ - grpc::{WasmGrpcProvider, WasmProvider}, - payloads::ism_routes::QueryIsmGeneralRequest, - signers::Signer, + grpc::WasmProvider, payloads::ism_routes::QueryIsmGeneralRequest, signers::Signer, ConnectionConf, CosmosProvider, }; use async_trait::async_trait; @@ -19,7 +17,7 @@ use crate::payloads::multisig_ism::{self, VerifyInfoRequest, VerifyInfoRequestIn pub struct CosmosMultisigIsm { domain: HyperlaneDomain, address: H256, - provider: Box, + provider: CosmosProvider, } impl CosmosMultisigIsm { @@ -29,12 +27,17 @@ impl CosmosMultisigIsm { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), address: locator.address, - provider: Box::new(provider), + provider, }) } } @@ -51,7 +54,7 @@ impl HyperlaneChain for CosmosMultisigIsm { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -70,6 +73,7 @@ impl MultisigIsm for CosmosMultisigIsm { let data = self .provider + .grpc() .wasm_query(QueryIsmGeneralRequest { ism: payload }, None) .await?; let response: multisig_ism::VerifyInfoResponse = serde_json::from_slice(&data)?; diff --git a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs index a47d660ded..0cbc7f2aff 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs @@ -5,6 +5,7 @@ use cosmrs::{ auth::v1beta1::{ query_client::QueryClient as QueryAccountClient, BaseAccount, QueryAccountRequest, }, + bank::v1beta1::{query_client::QueryClient as QueryBalanceClient, QueryBalanceRequest}, base::{ abci::v1beta1::TxResponse, tendermint::v1beta1::{service_client::ServiceClient, GetLatestBlockRequest}, @@ -77,14 +78,14 @@ pub trait WasmProvider: Send + Sync { async fn wasm_estimate_gas(&self, payload: T) -> ChainResult; } -#[derive(Debug)] +#[derive(Debug, Clone)] /// CosmWasm GRPC provider. pub struct WasmGrpcProvider { /// Connection configuration. conf: ConnectionConf, /// A contract address that can be used as the default /// for queries / sends / estimates. - contract_address: CosmosAddress, + contract_address: Option, /// Signer for transactions. signer: Option, /// GRPC Channel that can be cheaply cloned. @@ -96,13 +97,15 @@ impl WasmGrpcProvider { /// Create new CosmWasm GRPC Provider. pub fn new( conf: ConnectionConf, - locator: ContractLocator, + locator: Option, signer: Option, ) -> ChainResult { let endpoint = Endpoint::new(conf.get_grpc_url()).map_err(Into::::into)?; let channel = endpoint.connect_lazy(); - let contract_address = CosmosAddress::from_h256(locator.address, &conf.get_prefix())?; + let contract_address = locator + .map(|l| CosmosAddress::from_h256(l.address, &conf.get_prefix())) + .transpose()?; Ok(Self { conf, @@ -220,6 +223,24 @@ impl WasmGrpcProvider { Ok(gas_estimate) } + /// Fetches balance for a given `address` and `denom` + pub async fn get_balance(&self, address: String, denom: String) -> ChainResult { + let mut client = QueryBalanceClient::new(self.channel.clone()); + + let balance_request = tonic::Request::new(QueryBalanceRequest { address, denom }); + let response = client + .balance(balance_request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + + let balance = response + .balance + .ok_or_else(|| ChainCommunicationError::from_other_str("account not present"))?; + + Ok(balance.amount.parse()?) + } + /// Queries an account. async fn account_query(&self, account: String) -> ChainResult { let mut client = QueryAccountClient::new(self.channel.clone()); @@ -268,7 +289,10 @@ impl WasmProvider for WasmGrpcProvider { where T: Serialize + Send + Sync, { - self.wasm_query_to(self.contract_address.address(), payload, block_height) + let contract_address = self.contract_address.as_ref().ok_or_else(|| { + ChainCommunicationError::from_other_str("No contract address available") + })?; + self.wasm_query_to(contract_address.address(), payload, block_height) .await } @@ -308,10 +332,13 @@ impl WasmProvider for WasmGrpcProvider { { let signer = self.get_signer()?; let mut client = TxServiceClient::new(self.channel.clone()); + let contract_address = self.contract_address.as_ref().ok_or_else(|| { + ChainCommunicationError::from_other_str("No contract address available") + })?; let msgs = vec![MsgExecuteContract { sender: signer.address.clone(), - contract: self.contract_address.address(), + contract: contract_address.address(), msg: serde_json::to_string(&payload)?.as_bytes().to_vec(), funds: vec![], } @@ -354,9 +381,12 @@ impl WasmProvider for WasmGrpcProvider { // Estimating gas requires a signer, which we can reasonably expect to have // since we need one to send a tx with the estimated gas anyways. let signer = self.get_signer()?; + let contract_address = self.contract_address.as_ref().ok_or_else(|| { + ChainCommunicationError::from_other_str("No contract address available") + })?; let msg = MsgExecuteContract { sender: signer.address.clone(), - contract: self.contract_address.address(), + contract: contract_address.address(), msg: serde_json::to_string(&payload)?.as_bytes().to_vec(), funds: vec![], }; diff --git a/rust/chains/hyperlane-cosmos/src/providers/mod.rs b/rust/chains/hyperlane-cosmos/src/providers/mod.rs index cf9422b2f8..973a886a3c 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/mod.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/mod.rs @@ -1,23 +1,63 @@ use async_trait::async_trait; use hyperlane_core::{ - BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, + BlockInfo, ChainResult, ContractLocator, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, + TxnInfo, H256, U256, }; +use tendermint_rpc::{client::CompatMode, HttpClient}; + +use crate::{ConnectionConf, HyperlaneCosmosError, Signer}; + +use self::grpc::WasmGrpcProvider; /// cosmos grpc provider pub mod grpc; /// cosmos rpc provider pub mod rpc; -/// A reference to a Cosmos chain -#[derive(Debug)] +/// Abstraction over a connection to a Cosmos chain +#[derive(Debug, Clone)] pub struct CosmosProvider { domain: HyperlaneDomain, + canonical_asset: String, + grpc_client: WasmGrpcProvider, + rpc_client: HttpClient, } impl CosmosProvider { /// Create a reference to a Cosmos chain - pub fn new(domain: HyperlaneDomain) -> Self { - Self { domain } + pub fn new( + domain: HyperlaneDomain, + conf: ConnectionConf, + locator: Option, + signer: Option, + ) -> ChainResult { + let grpc_client = WasmGrpcProvider::new(conf.clone(), locator, signer)?; + let rpc_client = HttpClient::builder( + conf.get_rpc_url() + .parse() + .map_err(Into::::into)?, + ) + // Consider supporting different compatibility modes. + .compat_mode(CompatMode::latest()) + .build() + .map_err(Into::::into)?; + + Ok(Self { + domain, + rpc_client, + grpc_client, + canonical_asset: conf.get_canonical_asset(), + }) + } + + /// Get a grpc client + pub fn grpc(&self) -> &WasmGrpcProvider { + &self.grpc_client + } + + /// Get an rpc client + pub fn rpc(&self) -> &HttpClient { + &self.rpc_client } } @@ -27,9 +67,7 @@ impl HyperlaneChain for CosmosProvider { } fn provider(&self) -> Box { - Box::new(CosmosProvider { - domain: self.domain.clone(), - }) + Box::new(self.clone()) } } @@ -47,4 +85,11 @@ impl HyperlaneProvider for CosmosProvider { // FIXME Ok(true) } + + async fn get_balance(&self, address: String) -> ChainResult { + Ok(self + .grpc_client + .get_balance(address, self.canonical_asset.clone()) + .await?) + } } diff --git a/rust/chains/hyperlane-cosmos/src/providers/rpc.rs b/rust/chains/hyperlane-cosmos/src/providers/rpc.rs index 88c5ded065..1f0d2a24a1 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/rpc.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/rpc.rs @@ -1,7 +1,7 @@ use std::ops::RangeInclusive; use async_trait::async_trait; -use cosmrs::rpc::client::{Client, CompatMode, HttpClient}; +use cosmrs::rpc::client::Client; use cosmrs::rpc::endpoint::{tx, tx_search::Response as TxSearchResponse}; use cosmrs::rpc::query::Query; use cosmrs::rpc::Order; @@ -10,7 +10,7 @@ use hyperlane_core::{ChainCommunicationError, ChainResult, ContractLocator, LogM use tracing::{instrument, trace}; use crate::address::CosmosAddress; -use crate::{ConnectionConf, HyperlaneCosmosError}; +use crate::{ConnectionConf, CosmosProvider, HyperlaneCosmosError}; const PAGINATION_LIMIT: u8 = 100; @@ -50,7 +50,7 @@ impl ParsedEvent { #[derive(Debug)] /// Cosmwasm RPC Provider pub struct CosmosWasmIndexer { - client: HttpClient, + provider: CosmosProvider, contract_address: CosmosAddress, target_event_kind: String, reorg_period: u32, @@ -66,17 +66,14 @@ impl CosmosWasmIndexer { event_type: String, reorg_period: u32, ) -> ChainResult { - let client = HttpClient::builder( - conf.get_rpc_url() - .parse() - .map_err(Into::::into)?, - ) - // Consider supporting different compatibility modes. - .compat_mode(CompatMode::latest()) - .build() - .map_err(Into::::into)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + None, + )?; Ok(Self { - client, + provider, contract_address: CosmosAddress::from_h256( locator.address, conf.get_prefix().as_str(), @@ -91,7 +88,8 @@ impl CosmosWasmIndexer { #[instrument(level = "trace", err, skip(self))] async fn tx_search(&self, query: Query, page: u32) -> ChainResult { Ok(self - .client + .provider + .rpc() .tx_search(query, false, page, PAGINATION_LIMIT, Order::Ascending) .await .map_err(Into::::into)?) @@ -176,7 +174,8 @@ impl CosmosWasmIndexer { impl WasmIndexer for CosmosWasmIndexer { async fn get_finalized_block_number(&self) -> ChainResult { let latest_height: u32 = self - .client + .provider + .rpc() .latest_block() .await .map_err(Into::::into)? diff --git a/rust/chains/hyperlane-cosmos/src/routing_ism.rs b/rust/chains/hyperlane-cosmos/src/routing_ism.rs index 0a646c005b..63b759f1b9 100644 --- a/rust/chains/hyperlane-cosmos/src/routing_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/routing_ism.rs @@ -9,7 +9,7 @@ use hyperlane_core::{ use crate::{ address::CosmosAddress, - grpc::{WasmGrpcProvider, WasmProvider}, + grpc::WasmProvider, payloads::ism_routes::{ IsmRouteRequest, IsmRouteRequestInner, IsmRouteRespnose, QueryRoutingIsmGeneralRequest, }, @@ -22,7 +22,7 @@ use crate::{ pub struct CosmosRoutingIsm { domain: HyperlaneDomain, address: H256, - provider: Box, + provider: CosmosProvider, } impl CosmosRoutingIsm { @@ -32,12 +32,17 @@ impl CosmosRoutingIsm { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), address: locator.address, - provider: Box::new(provider), + provider, }) } } @@ -54,7 +59,7 @@ impl HyperlaneChain for CosmosRoutingIsm { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -69,6 +74,7 @@ impl RoutingIsm for CosmosRoutingIsm { let data = self .provider + .grpc() .wasm_query( QueryRoutingIsmGeneralRequest { routing_ism: payload, diff --git a/rust/chains/hyperlane-cosmos/src/validator_announce.rs b/rust/chains/hyperlane-cosmos/src/validator_announce.rs index 69f7121b88..6b0ee04930 100644 --- a/rust/chains/hyperlane-cosmos/src/validator_announce.rs +++ b/rust/chains/hyperlane-cosmos/src/validator_announce.rs @@ -7,7 +7,7 @@ use hyperlane_core::{ }; use crate::{ - grpc::{WasmGrpcProvider, WasmProvider}, + grpc::WasmProvider, payloads::validator_announce::{ self, AnnouncementRequest, AnnouncementRequestInner, GetAnnounceStorageLocationsRequest, GetAnnounceStorageLocationsRequestInner, @@ -22,7 +22,7 @@ use crate::{ pub struct CosmosValidatorAnnounce { domain: HyperlaneDomain, address: H256, - provider: Box, + provider: CosmosProvider, } impl CosmosValidatorAnnounce { @@ -32,12 +32,17 @@ impl CosmosValidatorAnnounce { locator: ContractLocator, signer: Option, ) -> ChainResult { - let provider = WasmGrpcProvider::new(conf.clone(), locator.clone(), signer)?; + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + signer, + )?; Ok(Self { domain: locator.domain.clone(), address: locator.address, - provider: Box::new(provider), + provider, }) } } @@ -54,7 +59,7 @@ impl HyperlaneChain for CosmosValidatorAnnounce { } fn provider(&self) -> Box { - Box::new(CosmosProvider::new(self.domain.clone())) + Box::new(self.provider.clone()) } } @@ -76,7 +81,7 @@ impl ValidatorAnnounce for CosmosValidatorAnnounce { }, }; - let data: Vec = self.provider.wasm_query(payload, None).await?; + let data: Vec = self.provider.grpc().wasm_query(payload, None).await?; let response: validator_announce::GetAnnounceStorageLocationsResponse = serde_json::from_slice(&data)?; @@ -102,6 +107,7 @@ impl ValidatorAnnounce for CosmosValidatorAnnounce { let response: TxResponse = self .provider + .grpc() .wasm_send(announce_request, tx_gas_limit) .await?; diff --git a/rust/chains/hyperlane-ethereum/src/provider.rs b/rust/chains/hyperlane-ethereum/src/provider.rs index 6ea06433d6..5fced1aaf4 100644 --- a/rust/chains/hyperlane-ethereum/src/provider.rs +++ b/rust/chains/hyperlane-ethereum/src/provider.rs @@ -6,7 +6,8 @@ use std::time::Duration; use async_trait::async_trait; use derive_new::new; use ethers::prelude::Middleware; -use hyperlane_core::ethers_core_types; +use ethers_core::abi::Address; +use hyperlane_core::{ethers_core_types, U256}; use tokio::time::sleep; use tracing::instrument; @@ -105,6 +106,19 @@ where .map_err(ChainCommunicationError::from_other)?; Ok(!code.is_empty()) } + + #[instrument(err, skip(self))] + async fn get_balance(&self, address: String) -> ChainResult { + // Can't use the address directly as a string, because ethers interprets it + // as an ENS name rather than an address. + let addr: Address = address.parse()?; + let balance = self + .provider + .get_balance(addr, None) + .await + .map_err(ChainCommunicationError::from_other)?; + Ok(balance.into()) + } } impl EthereumProvider diff --git a/rust/chains/hyperlane-ethereum/src/trait_builder.rs b/rust/chains/hyperlane-ethereum/src/trait_builder.rs index 03a33c2fdc..89e4f31d4f 100644 --- a/rust/chains/hyperlane-ethereum/src/trait_builder.rs +++ b/rust/chains/hyperlane-ethereum/src/trait_builder.rs @@ -10,6 +10,7 @@ use ethers::prelude::{ Http, JsonRpcClient, Middleware, NonceManagerMiddleware, Provider, Quorum, QuorumProvider, SignerMiddleware, WeightedProvider, Ws, WsClientError, }; +use hyperlane_core::metrics::agent::METRICS_SCRAPE_INTERVAL; use reqwest::{Client, Url}; use thiserror::Error; @@ -27,7 +28,6 @@ use hyperlane_core::{ use crate::{signers::Signers, ConnectionConf, FallbackProvider, RetryingProvider}; // This should be whatever the prometheus scrape interval is -const METRICS_SCRAPE_INTERVAL: Duration = Duration::from_secs(60); const HTTP_CLIENT_TIMEOUT: Duration = Duration::from_secs(60); /// An error when connecting to an ethereum provider. @@ -194,6 +194,9 @@ pub trait BuildableWithProvider { Ok(if let Some(metrics) = metrics { let provider = Arc::new(PrometheusMiddleware::new(provider, metrics.0, metrics.1)); + // TODO: This task is spawned each time `.build_ethereum(...)` is called, which is about 15 times, + // in spite of it doing the same thing, wasting resources. + // Only spawn this once along with the other agent tasks. tokio::spawn(provider.start_updating_on_interval(METRICS_SCRAPE_INTERVAL)); self.build_with_signer(provider, locator, signer).await? } else { diff --git a/rust/chains/hyperlane-fuel/src/provider.rs b/rust/chains/hyperlane-fuel/src/provider.rs index 92303f5795..8048076e04 100644 --- a/rust/chains/hyperlane-fuel/src/provider.rs +++ b/rust/chains/hyperlane-fuel/src/provider.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use hyperlane_core::{ - BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, + BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256, }; /// A wrapper around a fuel provider to get generic blockchain information. @@ -31,4 +31,8 @@ impl HyperlaneProvider for FuelProvider { async fn is_contract(&self, address: &H256) -> ChainResult { todo!() } + + async fn get_balance(&self, address: String) -> ChainResult { + todo!() + } } diff --git a/rust/chains/hyperlane-sealevel/src/provider.rs b/rust/chains/hyperlane-sealevel/src/provider.rs index b853e30e4b..47be23014c 100644 --- a/rust/chains/hyperlane-sealevel/src/provider.rs +++ b/rust/chains/hyperlane-sealevel/src/provider.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use hyperlane_core::{ - BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, + BlockInfo, ChainResult, HyperlaneChain, HyperlaneDomain, HyperlaneProvider, TxnInfo, H256, U256, }; /// A wrapper around a Sealevel provider to get generic blockchain information. @@ -43,4 +43,8 @@ impl HyperlaneProvider for SealevelProvider { // FIXME Ok(true) } + + async fn get_balance(&self, _address: String) -> ChainResult { + todo!() // FIXME + } } diff --git a/rust/ethers-prometheus/Cargo.toml b/rust/ethers-prometheus/Cargo.toml index 73dc90bd23..92e304d6fa 100644 --- a/rust/ethers-prometheus/Cargo.toml +++ b/rust/ethers-prometheus/Cargo.toml @@ -26,6 +26,7 @@ tokio = { workspace = true, features = ["time", "sync", "parking_lot"] } # enable feature for this crate that is imported by ethers-rs primitive-types = { workspace = true, features = ["fp-conversion"] } +hyperlane-core = { path = "../hyperlane-core", features = ["agent", "float"] } [build-dependencies] abigen = { path = "../utils/abigen", features = ["ethers"] } diff --git a/rust/ethers-prometheus/src/lib.rs b/rust/ethers-prometheus/src/lib.rs index 26a50ecb8b..8cf57329f0 100644 --- a/rust/ethers-prometheus/src/lib.rs +++ b/rust/ethers-prometheus/src/lib.rs @@ -3,8 +3,6 @@ #![forbid(unsafe_code)] #![warn(missing_docs)] -use ethers::prelude::U256; - mod contracts; pub mod json_rpc_client; @@ -19,8 +17,3 @@ pub struct ChainInfo { /// "kovan". pub name: Option, } - -/// Convert a u256 scaled integer value into the corresponding f64 value. -fn u256_as_scaled_f64(value: U256, decimals: u8) -> f64 { - value.to_f64_lossy() / (10u64.pow(decimals as u32) as f64) -} diff --git a/rust/ethers-prometheus/src/middleware/mod.rs b/rust/ethers-prometheus/src/middleware/mod.rs index 18db31d764..c63447484e 100644 --- a/rust/ethers-prometheus/src/middleware/mod.rs +++ b/rust/ethers-prometheus/src/middleware/mod.rs @@ -14,44 +14,21 @@ use ethers::abi::AbiEncode; use ethers::prelude::*; use ethers::types::transaction::eip2718::TypedTransaction; use ethers::utils::hex::ToHex; -use log::{debug, trace, warn}; +use hyperlane_core::metrics::agent::u256_as_scaled_f64; +use hyperlane_core::HyperlaneDomainProtocol; +use log::{debug, trace}; use maplit::hashmap; use prometheus::{CounterVec, GaugeVec, IntCounterVec, IntGaugeVec}; use static_assertions::assert_impl_all; use tokio::sync::RwLock; -use tokio::time::MissedTickBehavior; pub use error::PrometheusMiddlewareError; +use tokio::time::MissedTickBehavior; -use crate::contracts::erc_20::Erc20; -use crate::u256_as_scaled_f64; pub use crate::ChainInfo; mod error; -/// Some basic information about a token. -#[derive(Clone, Debug)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize))] -#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))] -pub struct TokenInfo { - /// Full name of the token. E.g. Ether. - pub name: String, - /// Token symbol. E.g. ETH. - pub symbol: String, - /// Number of - pub decimals: u8, -} - -impl Default for TokenInfo { - fn default() -> Self { - Self { - name: "Unknown".into(), - symbol: "".into(), - decimals: 18, - } - } -} - /// Some basic information about a wallet. #[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(serde::Deserialize))] @@ -148,18 +125,6 @@ pub const TRANSACTION_SEND_TOTAL_LABELS: &[&str] = /// Help string for the metric. pub const TRANSACTION_SEND_TOTAL_HELP: &str = "Number of transactions sent"; -/// Expected label names for the `wallet_balance` metric. -pub const WALLET_BALANCE_LABELS: &[&str] = &[ - "chain", - "wallet_address", - "wallet_name", - "token_address", - "token_symbol", - "token_name", -]; -/// Help string for the metric. -pub const WALLET_BALANCE_HELP: &str = "Current balance of eth and other tokens in the `tokens` map for the wallet addresses in the `wallets` set"; - /// Container for all the relevant middleware metrics. #[derive(Clone, Builder)] pub struct MiddlewareMetrics { @@ -238,24 +203,12 @@ pub struct MiddlewareMetrics { /// - `txn_status`: `dispatched`, `completed`, or `failed` #[builder(setter(into, strip_option), default)] transaction_send_total: Option, - // /// Gas spent on completed transactions. // /// - `chain`: the chain name (or ID if the name is unknown) of the chain the tx occurred // on. /// - `address_from`: source address of the transaction. // /// - `address_to`: destination address of the transaction. // #[builder(setter(into, strip_option), default)] // transaction_send_gas_eth_total: Option, - /// Current balance of eth and other tokens in the `tokens` map for the - /// wallet addresses in the `wallets` set. - /// - `chain`: the chain name (or chain ID if the name is unknown) of the - /// chain the tx occurred on. - /// - `wallet_address`: Address of the wallet holding the funds. - /// - `wallet_name`: Name of the address holding the funds. - /// - `token_address`: Address of the token. - /// - `token_symbol`: Symbol of the token. - /// - `token_name`: Full name of the token. - #[builder(setter(into, strip_option), default)] - wallet_balance: Option, } /// An ethers-rs middleware that instruments calls with prometheus metrics. To @@ -273,14 +226,6 @@ pub struct PrometheusMiddleware { #[cfg_attr(feature = "serde", derive(serde::Deserialize))] #[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))] pub struct PrometheusMiddlewareConf { - /// The tokens to track and identifying info - #[cfg_attr(feature = "serde", serde(default))] - pub tokens: HashMap, - - /// The wallets to track and identifying info - #[cfg_attr(feature = "serde", serde(default))] - pub wallets: HashMap, - /// Contract info for more useful metrics #[cfg_attr(feature = "serde", serde(default))] pub contracts: HashMap, @@ -521,32 +466,6 @@ impl PrometheusMiddleware { conf: Arc::new(RwLock::new(conf)), } } - - /// Start tracking metrics for a new token. - pub async fn track_new_token(&self, addr: Address, info: TokenInfo) { - self.track_new_tokens([(addr, info)]).await; - } - - /// Start tacking metrics for new tokens. - pub async fn track_new_tokens(&self, iter: impl IntoIterator) { - let mut data = self.conf.write().await; - for (addr, info) in iter { - data.tokens.insert(addr, info); - } - } - - /// Start tracking metrics for a new wallet. - pub async fn track_new_wallet(&self, addr: Address, info: WalletInfo) { - self.track_new_wallets([(addr, info)]).await; - } - - /// Start tracking metrics for new wallets. - pub async fn track_new_wallets(&self, iter: impl IntoIterator) { - let mut data = self.conf.write().await; - for (addr, info) in iter { - data.wallets.insert(addr, info); - } - } } impl PrometheusMiddleware { @@ -580,7 +499,6 @@ impl PrometheusMiddleware { /// prometheus scrape interval. pub fn update(&self) -> impl Future { // all metrics are Arcs internally so just clone the ones we want to report for. - let wallet_balance = self.metrics.wallet_balance.clone(); let block_height = self.metrics.block_height.clone(); let gas_price_gwei = self.metrics.gas_price_gwei.clone(); @@ -595,9 +513,6 @@ impl PrometheusMiddleware { if block_height.is_some() || gas_price_gwei.is_some() { Self::update_block_details(&*client, chain, block_height, gas_price_gwei).await; } - if let Some(wallet_balance) = wallet_balance { - Self::update_wallet_balances(client.clone(), &data, chain, wallet_balance).await; - } // more metrics to come... } @@ -609,9 +524,7 @@ impl PrometheusMiddleware { block_height: Option, gas_price_gwei: Option, ) { - let current_block = if let Ok(Some(b)) = client.get_block(BlockNumber::Latest).await { - b - } else { + let Ok(Some(current_block)) = client.get_block(BlockNumber::Latest).await else { return; }; @@ -627,7 +540,8 @@ impl PrometheusMiddleware { } if let Some(gas_price_gwei) = gas_price_gwei { if let Some(london_fee) = current_block.base_fee_per_gas { - let gas = u256_as_scaled_f64(london_fee, 18) * 1e9; + let gas = + u256_as_scaled_f64(london_fee.into(), HyperlaneDomainProtocol::Ethereum) * 1e9; trace!("Gas price for chain {chain} is {gas:.1}gwei"); gas_price_gwei.with(&hashmap! { "chain" => chain }).set(gas); } else { @@ -635,63 +549,6 @@ impl PrometheusMiddleware { } } } - - async fn update_wallet_balances( - client: Arc, - data: &PrometheusMiddlewareConf, - chain: &str, - wallet_balance_metric: GaugeVec, - ) { - for (wallet_addr, wallet_info) in data.wallets.iter() { - let wallet_addr_str: String = wallet_addr.encode_hex(); - let wallet_name = wallet_info.name.as_deref().unwrap_or("none"); - - match client.get_balance(*wallet_addr, None).await { - Ok(balance) => { - // Okay, so the native type is not a token, but whatever, close enough. - // Note: This is ETH for many chains, but not all so that is why we use `N` and `Native` - // TODO: can we get away with scaling as 18 in all cases here? I am guessing not. - let balance = u256_as_scaled_f64(balance, 18); - trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance} of the native currency"); - wallet_balance_metric - .with(&hashmap! { - "chain" => chain, - "wallet_address" => wallet_addr_str.as_str(), - "wallet_name" => wallet_name, - "token_address" => "none", - "token_symbol" => "Native", - "token_name" => "Native" - }).set(balance) - }, - Err(e) => warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for native currency; {e}") - } - for (token_addr, token) in data.tokens.iter() { - let token_addr_str: String = token_addr.encode_hex(); - let balance = match Erc20::new(*token_addr, client.clone()) - .balance_of(*wallet_addr) - .call() - .await - { - Ok(b) => u256_as_scaled_f64(b, token.decimals), - Err(e) => { - warn!("Metric update failed for wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance for {name}; {e}", name=token.name); - continue; - } - }; - trace!("Wallet {wallet_name} ({wallet_addr_str}) on chain {chain} balance is {balance}{}", token.symbol); - wallet_balance_metric - .with(&hashmap! { - "chain" => chain, - "wallet_address" => wallet_addr_str.as_str(), - "wallet_name" => wallet_name, - "token_address" => token_addr_str.as_str(), - "token_symbol" => token.symbol.as_str(), - "token_name" => token.symbol.as_str() - }) - .set(balance); - } - } - } } impl Debug for PrometheusMiddleware { diff --git a/rust/hyperlane-base/Cargo.toml b/rust/hyperlane-base/Cargo.toml index 02d870e644..4e78c30243 100644 --- a/rust/hyperlane-base/Cargo.toml +++ b/rust/hyperlane-base/Cargo.toml @@ -15,6 +15,7 @@ bs58.workspace = true color-eyre = { workspace = true, optional = true } config.workspace = true convert_case.workspace = true +derive_builder.workspace = true derive-new.workspace = true ed25519-dalek.workspace = true ethers.workspace = true @@ -22,6 +23,7 @@ eyre.workspace = true fuels.workspace = true futures-util.workspace = true itertools.workspace = true +maplit.workspace = true paste.workspace = true prometheus.workspace = true rocksdb.workspace = true diff --git a/rust/hyperlane-base/src/agent.rs b/rust/hyperlane-base/src/agent.rs index 540a32254c..5df9250889 100644 --- a/rust/hyperlane-base/src/agent.rs +++ b/rust/hyperlane-base/src/agent.rs @@ -7,7 +7,10 @@ use hyperlane_core::config::*; use tokio::task::JoinHandle; use tracing::{debug_span, instrument::Instrumented, Instrument}; -use crate::{metrics::CoreMetrics, settings::Settings}; +use crate::{ + metrics::{create_agent_metrics, AgentMetrics, CoreMetrics}, + settings::Settings, +}; /// Properties shared across all hyperlane agents #[derive(Debug)] @@ -36,7 +39,11 @@ pub trait BaseAgent: Send + Sync + Debug { type Settings: LoadableFromSettings; /// Instantiate the agent from the standard settings object - async fn from_settings(settings: Self::Settings, metrics: Arc) -> Result + async fn from_settings( + settings: Self::Settings, + metrics: Arc, + agent_metrics: AgentMetrics, + ) -> Result where Self: Sized; @@ -68,7 +75,8 @@ pub async fn agent_main() -> Result<()> { let metrics = settings.as_ref().metrics(A::AGENT_NAME)?; core_settings.tracing.start_tracing(&metrics)?; - let agent = A::from_settings(settings, metrics.clone()).await?; + let agent_metrics = create_agent_metrics(&metrics)?; + let agent = A::from_settings(settings, metrics.clone(), agent_metrics).await?; metrics.run_http_server(); agent.run().await.await? diff --git a/rust/hyperlane-base/src/lib.rs b/rust/hyperlane-base/src/lib.rs index eeb3e58c23..b4a0b1cf9c 100644 --- a/rust/hyperlane-base/src/lib.rs +++ b/rust/hyperlane-base/src/lib.rs @@ -12,7 +12,7 @@ pub mod settings; mod agent; pub use agent::*; -mod metrics; +pub mod metrics; pub use metrics::*; mod contract_sync; diff --git a/rust/hyperlane-base/src/metrics/agent_metrics.rs b/rust/hyperlane-base/src/metrics/agent_metrics.rs new file mode 100644 index 0000000000..ab4e01f513 --- /dev/null +++ b/rust/hyperlane-base/src/metrics/agent_metrics.rs @@ -0,0 +1,120 @@ +use std::time::Duration; + +use derive_builder::Builder; +use derive_new::new; +use eyre::Result; +use hyperlane_core::metrics::agent::u256_as_scaled_f64; +use hyperlane_core::HyperlaneDomain; +use hyperlane_core::HyperlaneProvider; +use maplit::hashmap; +use prometheus::GaugeVec; +use tokio::time::MissedTickBehavior; +use tracing::{trace, warn}; + +use crate::CoreMetrics; + +/// Expected label names for the `wallet_balance` metric. +pub const WALLET_BALANCE_LABELS: &[&str] = &[ + "chain", + "wallet_address", + "wallet_name", + "token_address", + "token_symbol", + "token_name", +]; +/// Help string for the metric. +pub const WALLET_BALANCE_HELP: &str = + "Current native token balance for the wallet addresses in the `wallets` set"; + +/// Agent-specific metrics +#[derive(Clone, Builder)] +pub struct AgentMetrics { + /// Current balance of native tokens for the + /// wallet address. + /// - `chain`: the chain name (or chain ID if the name is unknown) of the + /// chain the tx occurred on. + /// - `wallet_address`: Address of the wallet holding the funds. + /// - `wallet_name`: Name of the address holding the funds. + /// - `token_address`: Address of the token. + /// - `token_symbol`: Symbol of the token. + /// - `token_name`: Full name of the token. + #[builder(setter(into, strip_option), default)] + wallet_balance: Option, +} + +pub(crate) fn create_agent_metrics(metrics: &CoreMetrics) -> Result { + Ok(AgentMetricsBuilder::default() + .wallet_balance(metrics.new_gauge( + "wallet_balance", + WALLET_BALANCE_HELP, + WALLET_BALANCE_LABELS, + )?) + .build()?) +} + +/// Configuration for the prometheus middleware. This can be loaded via serde. +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize))] +#[cfg_attr(feature = "serde", serde(tag = "type", rename_all = "camelCase"))] +pub struct AgentMetricsConf { + /// The account to track + #[cfg_attr(feature = "serde", serde(default))] + pub address: Option, + + /// Information about the chain this metric is for + pub domain: HyperlaneDomain, + + /// Name of the agent the metrics are about + pub name: String, +} + +/// Utility struct to update agent metrics for a given chain +#[derive(new)] +pub struct AgentMetricsUpdater { + metrics: AgentMetrics, + conf: AgentMetricsConf, + provider: Box, +} + +impl AgentMetricsUpdater { + async fn update_wallet_balances(&self) { + let Some(wallet_addr) = self.conf.address.clone() else { + return; + }; + let wallet_name = self.conf.name.clone(); + let Some(wallet_balance_metric) = self.metrics.wallet_balance.clone() else { + return; + }; + let chain = self.conf.domain.name(); + + match self.provider.get_balance(wallet_addr.clone()).await { + Ok(balance) => { + // Okay, so the native type is not a token, but whatever, close enough. + // Note: This is ETH for many chains, but not all so that is why we use `N` and `Native` + // TODO: can we get away with scaling as 18 in all cases here? I am guessing not. + let balance = u256_as_scaled_f64(balance, self.conf.domain.domain_protocol()); + trace!("Wallet {wallet_name} ({wallet_addr}) on chain {chain} balance is {balance} of the native currency"); + wallet_balance_metric + .with(&hashmap! { + "chain" => chain, + "wallet_address" => wallet_addr.as_str(), + "wallet_name" => wallet_name.as_str(), + "token_address" => "none", + "token_symbol" => "Native", + "token_name" => "Native" + }).set(balance) + }, + Err(e) => warn!("Metric update failed for wallet {wallet_name} ({wallet_addr}) on chain {chain} balance for native currency; {e}") + } + } + + /// Periodically updates the metrics + pub async fn start_updating_on_interval(self, period: Duration) { + let mut interval = tokio::time::interval(period); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + self.update_wallet_balances().await; + interval.tick().await; + } + } +} diff --git a/rust/hyperlane-base/src/metrics/mod.rs b/rust/hyperlane-base/src/metrics/mod.rs index ff30be6dc7..b2b1c6acd9 100644 --- a/rust/hyperlane-base/src/metrics/mod.rs +++ b/rust/hyperlane-base/src/metrics/mod.rs @@ -1,10 +1,14 @@ //! Useful metrics that all agents should track. +pub use self::core::*; + /// The metrics namespace prefix. All metric names will start with `{NAMESPACE}_`. pub const NAMESPACE: &str = "hyperlane"; mod core; -pub use self::core::*; +mod agent_metrics; mod json_rpc_client; mod provider; + +pub use self::agent_metrics::*; diff --git a/rust/hyperlane-base/src/metrics/provider.rs b/rust/hyperlane-base/src/metrics/provider.rs index 86a9fd5602..54def51ae0 100644 --- a/rust/hyperlane-base/src/metrics/provider.rs +++ b/rust/hyperlane-base/src/metrics/provider.rs @@ -46,10 +46,5 @@ pub(crate) fn create_provider_metrics(metrics: &CoreMetrics) -> Result todo!(), ChainConnectionConf::Sealevel(_) => todo!(), - ChainConnectionConf::Cosmos(_) => todo!(), + ChainConnectionConf::Cosmos(conf) => { + let locator = self.locator(H256::zero()); + let provider = CosmosProvider::new( + locator.domain.clone(), + conf.clone(), + Some(locator.clone()), + None, + )?; + Ok(Box::new(provider) as Box) + } } .context(ctx) } @@ -639,13 +650,19 @@ impl ChainConf { self.signer().await } + /// Try to build an agent metrics configuration from the chain config + pub async fn agent_metrics_conf(&self, agent_name: String) -> Result { + let chain_signer_address = self.chain_signer().await?.map(|s| s.address_string()); + Ok(AgentMetricsConf { + address: chain_signer_address, + domain: self.domain.clone(), + name: agent_name, + }) + } + /// Get a clone of the ethereum metrics conf with correctly configured /// contract information. - fn metrics_conf( - &self, - agent_name: &str, - signer: &Option, - ) -> PrometheusMiddlewareConf { + pub fn metrics_conf(&self) -> PrometheusMiddlewareConf { let mut cfg = self.metrics_conf.clone(); if cfg.chain.is_none() { @@ -654,14 +671,6 @@ impl ChainConf { }); } - if let Some(signer) = signer { - cfg.wallets - .entry(signer.eth_address().into()) - .or_insert_with(|| WalletInfo { - name: Some(agent_name.into()), - }); - } - let mut register_contract = |name: &str, address: H256, fns: HashMap, String>| { cfg.contracts .entry(address.into()) @@ -718,7 +727,7 @@ impl ChainConf { B: BuildableWithProvider + Sync, { let signer = self.ethereum_signer().await?; - let metrics_conf = self.metrics_conf(metrics.agent_name(), &signer); + let metrics_conf = self.metrics_conf(); let rpc_metrics = Some(metrics.json_rpc_client_metrics()); let middleware_metrics = Some((metrics.provider_metrics(), metrics_conf)); let res = builder diff --git a/rust/hyperlane-base/src/settings/signers.rs b/rust/hyperlane-base/src/settings/signers.rs index 2979488544..c48de65f73 100644 --- a/rust/hyperlane-base/src/settings/signers.rs +++ b/rust/hyperlane-base/src/settings/signers.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use ed25519_dalek::SecretKey; use ethers::prelude::{AwsSigner, LocalWallet}; +use ethers::utils::hex::ToHex; use eyre::{bail, Context, Report}; use hyperlane_core::H256; use hyperlane_sealevel::Keypair; @@ -96,7 +97,7 @@ impl BuildableWithSignerConf for hyperlane_ethereum::Signers { impl ChainSigner for hyperlane_ethereum::Signers { fn address_string(&self) -> String { - ethers::abi::AbiEncode::encode_hex(ethers::signers::Signer::address(self)) + ethers::signers::Signer::address(self).encode_hex() } } diff --git a/rust/hyperlane-core/src/chain.rs b/rust/hyperlane-core/src/chain.rs index ad28765a5f..8f2f37db98 100644 --- a/rust/hyperlane-core/src/chain.rs +++ b/rust/hyperlane-core/src/chain.rs @@ -5,6 +5,7 @@ use std::{ hash::{Hash, Hasher}, }; +use derive_new::new; use num_derive::FromPrimitive; use num_traits::FromPrimitive; #[cfg(feature = "strum")] @@ -18,7 +19,7 @@ pub struct Address(pub bytes::Bytes); #[derive(Debug, Clone)] pub struct Balance(pub num::BigInt); -#[derive(Debug, Clone)] +#[derive(Debug, Clone, new)] pub struct ContractLocator<'a> { pub domain: &'a HyperlaneDomain, pub address: H256, diff --git a/rust/hyperlane-core/src/lib.rs b/rust/hyperlane-core/src/lib.rs index 0e8349186b..6834df3951 100644 --- a/rust/hyperlane-core/src/lib.rs +++ b/rust/hyperlane-core/src/lib.rs @@ -26,6 +26,8 @@ pub mod utils; pub mod test_utils; pub mod config; +/// Prometheus metrics traits / utilities +pub mod metrics; /// Core hyperlane system data structures mod types; diff --git a/rust/hyperlane-core/src/metrics/agent.rs b/rust/hyperlane-core/src/metrics/agent.rs new file mode 100644 index 0000000000..02c2795e59 --- /dev/null +++ b/rust/hyperlane-core/src/metrics/agent.rs @@ -0,0 +1,29 @@ +use crate::HyperlaneDomainProtocol; +use std::time::Duration; + +use crate::U256; + +const ETHEREUM_DECIMALS: u8 = 18; +const COSMOS_DECIMALS: u8 = 6; +const SOLANA_DECIMALS: u8 = 9; + +/// Interval for querying the prometheus metrics endpoint. +/// This should be whatever the prometheus scrape interval is +pub const METRICS_SCRAPE_INTERVAL: Duration = Duration::from_secs(60); + +/// Convert a u256 scaled integer value into the corresponding f64 value. +#[cfg(feature = "float")] +pub fn u256_as_scaled_f64(value: U256, domain: HyperlaneDomainProtocol) -> f64 { + let decimals = decimals_by_protocol(domain); + value.to_f64_lossy() / (10u64.pow(decimals as u32) as f64) +} + +/// Get the decimals each protocol typically uses for its lowest denomination +/// of the native token +pub fn decimals_by_protocol(protocol: HyperlaneDomainProtocol) -> u8 { + match protocol { + HyperlaneDomainProtocol::Cosmos => COSMOS_DECIMALS, + HyperlaneDomainProtocol::Sealevel => SOLANA_DECIMALS, + _ => ETHEREUM_DECIMALS, + } +} diff --git a/rust/hyperlane-core/src/metrics/mod.rs b/rust/hyperlane-core/src/metrics/mod.rs new file mode 100644 index 0000000000..4c82be31d7 --- /dev/null +++ b/rust/hyperlane-core/src/metrics/mod.rs @@ -0,0 +1,2 @@ +/// Agent metrics utils +pub mod agent; diff --git a/rust/hyperlane-core/src/traits/provider.rs b/rust/hyperlane-core/src/traits/provider.rs index 3f00e7a4ac..7b2c930926 100644 --- a/rust/hyperlane-core/src/traits/provider.rs +++ b/rust/hyperlane-core/src/traits/provider.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use auto_impl::auto_impl; use thiserror::Error; -use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256}; +use crate::{BlockInfo, ChainResult, HyperlaneChain, TxnInfo, H256, U256}; /// Interface for a provider. Allows abstraction over different provider types /// for different chains. @@ -24,6 +24,9 @@ pub trait HyperlaneProvider: HyperlaneChain + Send + Sync + Debug { /// Returns whether a contract exists at the provided address async fn is_contract(&self, address: &H256) -> ChainResult; + + /// Fetch the balance of the wallet address associated with the chain provider. + async fn get_balance(&self, address: String) -> ChainResult; } /// Errors when querying for provider information. diff --git a/rust/utils/run-locally/Cargo.toml b/rust/utils/run-locally/Cargo.toml index 1f59fb2fb3..66e86a92bc 100644 --- a/rust/utils/run-locally/Cargo.toml +++ b/rust/utils/run-locally/Cargo.toml @@ -10,7 +10,7 @@ publish.workspace = true version.workspace = true [dependencies] -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["float"]} toml_edit.workspace = true k256.workspace = true ripemd.workspace = true diff --git a/rust/utils/run-locally/src/cosmos/mod.rs b/rust/utils/run-locally/src/cosmos/mod.rs index e60b98e01f..28c9515137 100644 --- a/rust/utils/run-locally/src/cosmos/mod.rs +++ b/rust/utils/run-locally/src/cosmos/mod.rs @@ -7,6 +7,7 @@ use std::{env, fs}; use cosmwasm_schema::cw_serde; use hpl_interface::types::bech32_decode; use macro_rules_attribute::apply; +use maplit::hashmap; use tempfile::tempdir; mod cli; @@ -24,16 +25,17 @@ use utils::*; use crate::cosmos::link::link_networks; use crate::logging::log; +use crate::metrics::agent_balance_sum; use crate::program::Program; use crate::utils::{as_task, concat_path, stop_child, AgentHandles, TaskHandle}; -use crate::AGENT_BIN_PATH; +use crate::{fetch_metric, AGENT_BIN_PATH}; use cli::{OsmosisCLI, OsmosisEndpoint}; use self::deploy::deploy_cw_hyperlane; use self::source::{CLISource, CodeSource}; const OSMOSIS_CLI_GIT: &str = "https://github.com/osmosis-labs/osmosis"; -const OSMOSIS_CLI_VERSION: &str = "19.0.0"; +const OSMOSIS_CLI_VERSION: &str = "20.5.0"; const KEY_HPL_VALIDATOR: (&str,&str) = ("hpl-validator", "guard evolve region sentence danger sort despair eye deputy brave trim actor left recipe debate document upgrade sustain bus cage afford half demand pigeon"); const KEY_HPL_RELAYER: (&str,&str) = ("hpl-relayer", "moral item damp melt gloom vendor notice head assume balance doctor retire fashion trim find biology saddle undo switch fault cattle toast drip empty"); @@ -257,7 +259,6 @@ fn launch_cosmos_validator( .hyp_env("CHECKPOINTSYNCER_PATH", checkpoint_path.to_str().unwrap()) .hyp_env("CHECKPOINTSYNCER_TYPE", "localStorage") .hyp_env("ORIGINCHAINNAME", agent_config.name) - .hyp_env("REORGPERIOD", "100") .hyp_env("DB", validator_base_db.to_str().unwrap()) .hyp_env("METRICSPORT", agent_config.metrics_port.to_string()) .hyp_env("VALIDATOR_SIGNER_TYPE", agent_config.signer.typ) @@ -287,7 +288,6 @@ fn launch_cosmos_relayer( .env("CONFIG_FILES", agent_config_path.to_str().unwrap()) .env("RUST_BACKTRACE", "1") .hyp_env("RELAYCHAINS", relay_chains.join(",")) - .hyp_env("REORGPERIOD", "100") .hyp_env("DB", relayer_base.as_ref().to_str().unwrap()) .hyp_env("ALLOWLOCALCHECKPOINTSYNCERS", "true") .hyp_env("TRACING_LEVEL", if debug { "debug" } else { "info" }) @@ -460,6 +460,11 @@ fn run_locally() { debug, ); + // give things a chance to fully start. + sleep(Duration::from_secs(10)); + + let starting_relayer_balance: f64 = agent_balance_sum(hpl_rly_metrics_port).unwrap(); + // dispatch messages let mut dispatched_messages = 0; @@ -517,12 +522,16 @@ fn run_locally() { // Mostly copy-pasta from `rust/utils/run-locally/src/main.rs` // TODO: refactor to share code let loop_start = Instant::now(); - // give things a chance to fully start. - sleep(Duration::from_secs(5)); let mut failure_occurred = false; loop { // look for the end condition. - if termination_invariants_met(hpl_rly_metrics_port, dispatched_messages).unwrap_or(false) { + if termination_invariants_met( + hpl_rly_metrics_port, + dispatched_messages, + starting_relayer_balance, + ) + .unwrap_or(false) + { // end condition reached successfully break; } else if (Instant::now() - loop_start).as_secs() > TIMEOUT_SECS { @@ -542,44 +551,62 @@ fn run_locally() { } } -fn termination_invariants_met(_metrics_port: u32, _messages_expected: u32) -> eyre::Result { +fn termination_invariants_met( + relayer_metrics_port: u32, + messages_expected: u32, + starting_relayer_balance: f64, +) -> eyre::Result { + let gas_payments_scraped = fetch_metric( + &relayer_metrics_port.to_string(), + "hyperlane_contract_sync_stored_events", + &hashmap! {"data_type" => "gas_payment"}, + )? + .iter() + .sum::(); + let expected_gas_payments = messages_expected; + if gas_payments_scraped != expected_gas_payments { + log!( + "Scraper has scraped {} gas payments, expected {}", + gas_payments_scraped, + expected_gas_payments + ); + return Ok(false); + } + + let delivered_messages_scraped = fetch_metric( + &relayer_metrics_port.to_string(), + "hyperlane_operations_processed_count", + &hashmap! {"phase" => "confirmed"}, + )? + .iter() + .sum::(); + if delivered_messages_scraped != messages_expected { + log!( + "Relayer confirmed {} submitted messages, expected {}", + delivered_messages_scraped, + messages_expected + ); + return Ok(false); + } + + let ending_relayer_balance: f64 = agent_balance_sum(relayer_metrics_port).unwrap(); + + // Make sure the balance was correctly updated in the metrics. + // Ideally, make sure that the difference is >= gas_per_tx * gas_cost, set here: + // https://github.com/hyperlane-xyz/hyperlane-monorepo/blob/c2288eb31734ba1f2f997e2c6ecb30176427bc2c/rust/utils/run-locally/src/cosmos/cli.rs#L55 + // What's stopping this is that the format returned by the `uosmo` balance query is a surprisingly low number (0.000003999999995184) + // but then maybe the gas_per_tx is just very low - how can we check that? (maybe by simulating said tx) + if starting_relayer_balance <= ending_relayer_balance { + log!( + "Expected starting relayer balance to be greater than ending relayer balance, but got {} <= {}", + starting_relayer_balance, + ending_relayer_balance + ); + return Ok(false); + } + + log!("Termination invariants have been meet"); Ok(true) - // TODO: uncomment once CI passes consistently on Ubuntu - // let gas_payments_scraped = fetch_metric( - // "9093", - // "hyperlane_contract_sync_stored_events", - // &hashmap! {"data_type" => "gas_payment"}, - // )? - // .iter() - // .sum::(); - // let expected_gas_payments = messages_expected; - // if gas_payments_scraped != expected_gas_payments { - // log!( - // "Scraper has scraped {} gas payments, expected {}", - // gas_payments_scraped, - // expected_gas_payments - // ); - // return Ok(false); - // } - - // let delivered_messages_scraped = fetch_metric( - // "9093", - // "hyperlane_operations_processed_count", - // &hashmap! {"phase" => "confirmed"}, - // )? - // .iter() - // .sum::(); - // if delivered_messages_scraped != messages_expected { - // log!( - // "Relayer confirmed {} submitted messages, expected {}", - // delivered_messages_scraped, - // messages_expected - // ); - // return Ok(false); - // } - - // log!("Termination invariants have been meet"); - // Ok(true) } #[cfg(test)] diff --git a/rust/utils/run-locally/src/cosmos/types.rs b/rust/utils/run-locally/src/cosmos/types.rs index 138cd3522d..f50986b600 100644 --- a/rust/utils/run-locally/src/cosmos/types.rs +++ b/rust/utils/run-locally/src/cosmos/types.rs @@ -119,7 +119,6 @@ pub struct AgentConfig { pub validator_announce: String, pub merkle_tree_hook: String, pub protocol: String, - pub finality_blocks: u32, pub chain_id: String, pub rpc_urls: Vec, pub grpc_url: String, @@ -151,7 +150,6 @@ impl AgentConfig { validator_announce: to_hex_addr(&network.deployments.va), merkle_tree_hook: to_hex_addr(&network.deployments.hook_merkle), protocol: "cosmos".to_string(), - finality_blocks: 1, chain_id: format!("cosmos-test-{}", network.domain), rpc_urls: vec![AgentUrl { http: format!( diff --git a/rust/utils/run-locally/src/invariants.rs b/rust/utils/run-locally/src/invariants.rs index f1fb725959..6fe857a436 100644 --- a/rust/utils/run-locally/src/invariants.rs +++ b/rust/utils/run-locally/src/invariants.rs @@ -1,6 +1,7 @@ // use std::path::Path; use crate::config::Config; +use crate::metrics::agent_balance_sum; use maplit::hashmap; use crate::logging::log; @@ -15,6 +16,7 @@ pub const SOL_MESSAGES_EXPECTED: u32 = 0; /// number of messages have been sent. pub fn termination_invariants_met( config: &Config, + starting_relayer_balance: f64, // solana_cli_tools_path: &Path, // solana_config_path: &Path, ) -> eyre::Result { @@ -129,6 +131,17 @@ pub fn termination_invariants_met( return Ok(false); } + let ending_relayer_balance: f64 = agent_balance_sum(9092).unwrap(); + // Make sure the balance was correctly updated in the metrics. + if starting_relayer_balance <= ending_relayer_balance { + log!( + "Expected starting relayer balance to be greater than ending relayer balance, but got {} <= {}", + starting_relayer_balance, + ending_relayer_balance + ); + return Ok(false); + } + log!("Termination invariants have been meet"); Ok(true) } diff --git a/rust/utils/run-locally/src/main.rs b/rust/utils/run-locally/src/main.rs index 0ee63b02d4..52d56ed5d4 100644 --- a/rust/utils/run-locally/src/main.rs +++ b/rust/utils/run-locally/src/main.rs @@ -30,6 +30,7 @@ use crate::{ config::Config, ethereum::start_anvil, invariants::termination_invariants_met, + metrics::agent_balance_sum, solana::*, utils::{concat_path, make_static, stop_child, AgentHandles, ArbitraryData, TaskHandle}, }; @@ -388,12 +389,13 @@ fn main() -> ExitCode { let loop_start = Instant::now(); // give things a chance to fully start. - sleep(Duration::from_secs(5)); + sleep(Duration::from_secs(10)); let mut failure_occurred = false; + let starting_relayer_balance: f64 = agent_balance_sum(9092).unwrap(); while !SHUTDOWN.load(Ordering::Relaxed) { if config.ci_mode { // for CI we have to look for the end condition. - if termination_invariants_met(&config) + if termination_invariants_met(&config, starting_relayer_balance) // if termination_invariants_met(&config, &solana_path, &solana_config_path) .unwrap_or(false) { diff --git a/rust/utils/run-locally/src/metrics.rs b/rust/utils/run-locally/src/metrics.rs index 3ee1f17146..aad0f626d9 100644 --- a/rust/utils/run-locally/src/metrics.rs +++ b/rust/utils/run-locally/src/metrics.rs @@ -1,8 +1,14 @@ -use std::collections::HashMap; +use std::{collections::HashMap, error::Error as StdError, str::FromStr}; -use eyre::{eyre, Result}; +use eyre::{eyre, ErrReport, Result}; +use maplit::hashmap; -pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> Result> { +/// Fetch a prometheus format metric, filtering by labels. +pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> Result> +where + T: FromStr, + E: Into + StdError + Send + Sync + 'static, +{ let resp = ureq::get(&format!("http://127.0.0.1:{}/metrics", port)); resp.call()? .into_string()? @@ -16,10 +22,19 @@ pub fn fetch_metric(port: &str, metric: &str, labels: &HashMap<&str, &str>) -> R .all(|(k, v)| l.contains(&format!("{k}=\"{v}"))) }) .map(|l| { - Ok(l.rsplit_once(' ') - .ok_or(eyre!("Unknown metric format"))? - .1 - .parse::()?) + let value = l.rsplit_once(' ').ok_or(eyre!("Unknown metric format"))?.1; + Ok(value.parse::()?) }) .collect() } + +pub fn agent_balance_sum(metrics_port: u32) -> eyre::Result { + let balance = fetch_metric( + &metrics_port.to_string(), + "hyperlane_wallet_balance", + &hashmap! {}, + )? + .iter() + .sum(); + Ok(balance) +}