Skip to content

Commit

Permalink
Minimize Tx queue config (#596)
Browse files Browse the repository at this point in the history
* update tx-queue and implement TxQueue config traits

* update type

* update doc
  • Loading branch information
salman01zp authored Dec 14, 2023
1 parent df5a20b commit e71db78
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 99 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/relayer-context/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ repository = { workspace = true }
webb-relayer-config = { workspace = true }
webb-relayer-utils = { workspace = true }
webb-relayer-store = { workspace = true }
webb-relayer-tx-queue = { workspace = true }
webb-relayer-types = { workspace = true }
webb-price-oracle-backends = { workspace = true, features = ["coingecko"] }

async-trait = { workspace = true }
url = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
webb = { workspace = true }
Expand Down
98 changes: 97 additions & 1 deletion crates/relayer-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{broadcast, Mutex};
use webb::substrate::subxt::OnlineClient;
use webb_relayer_tx_queue::evm::EvmTxQueueConfig;
use webb_relayer_tx_queue::substrate::SubstrateTxQueueConfig;
use webb_relayer_types::rpc_client::WebbRpcClient;

use webb::evm::ethers;
Expand Down Expand Up @@ -310,7 +313,7 @@ impl RelayerContext {
chain_id: I,
) -> webb_relayer_utils::Result<GasOracleMedian> {
let chain_id: types::U256 = chain_id.into();
let chain_provider = self.evm_provider(chain_id).await?;
let chain_provider = self.evm_provider(&chain_id).await?;
let provider_gas_oracle = ProviderOracle::new(chain_provider);
let mut gas_oracle = GasOracleMedian::new();
// Give only 10% of the weight to the provider gas oracle
Expand All @@ -334,6 +337,99 @@ impl RelayerContext {
}
}

#[cfg(feature = "evm")]
#[async_trait::async_trait]
impl EvmTxQueueConfig for RelayerContext {
type EtherClient = EthersClient;

fn max_sleep_interval(
&self,
chain_id: &U256,
) -> webb_relayer_utils::Result<u64> {
let chain_config = self
.config
.evm
.get(&chain_id.as_u64().to_string())
.ok_or_else(|| webb_relayer_utils::Error::ChainNotFound {
chain_id: chain_id.to_string(),
})?;
Ok(chain_config.tx_queue.max_sleep_interval)
}

fn block_confirmations(
&self,
chain_id: &U256,
) -> webb_relayer_utils::Result<u8> {
let chain_config = self
.config
.evm
.get(&chain_id.as_u64().to_string())
.ok_or_else(|| webb_relayer_utils::Error::ChainNotFound {
chain_id: chain_id.to_string(),
})?;
Ok(chain_config.block_confirmations)
}

fn explorer(
&self,
chain_id: &U256,
) -> webb_relayer_utils::Result<Option<url::Url>> {
let chain_config = self
.config
.evm
.get(&chain_id.as_u64().to_string())
.ok_or_else(|| webb_relayer_utils::Error::ChainNotFound {
chain_id: chain_id.to_string(),
})?;
Ok(chain_config.explorer.clone())
}

async fn get_evm_provider(
&self,
chain_id: &U256,
) -> webb_relayer_utils::Result<Arc<Self::EtherClient>> {
self.evm_provider(chain_id).await
}

async fn get_evm_wallet(
&self,
chain_id: &U256,
) -> webb_relayer_utils::Result<LocalWallet> {
self.evm_wallet(chain_id).await
}
}

#[cfg(feature = "substrate")]
#[async_trait::async_trait]
impl SubstrateTxQueueConfig for RelayerContext {
fn max_sleep_interval(
&self,
chain_id: u32,
) -> webb_relayer_utils::Result<u64> {
let chain_config =
self.config.substrate.get(&chain_id.to_string()).ok_or(
webb_relayer_utils::Error::NodeNotFound {
chain_id: chain_id.to_string(),
},
)?;
Ok(chain_config.tx_queue.max_sleep_interval)
}

async fn substrate_provider<C: subxt::Config>(
&self,
chain_id: u32,
) -> webb_relayer_utils::Result<OnlineClient<C>> {
self.substrate_provider(chain_id).await
}

async fn substrate_wallet(
&self,
chain_id: u32,
) -> webb_relayer_utils::Result<Sr25519Pair> {
self.substrate_wallet(chain_id).await
}
}

/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
Expand Down
7 changes: 4 additions & 3 deletions crates/tx-queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ repository = { workspace = true }
[dependencies]
webb-relayer-types = { workspace = true }
webb-relayer-store = { workspace = true }
webb-relayer-context = { workspace = true }
webb-relayer-utils = { workspace = true }

async-trait = { workspace = true }
tracing = { workspace = true }
sled = { workspace = true }
hex = { workspace = true }
futures = { workspace = true }
backoff = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }
webb = { workspace = true }
subxt-signer = { workspace = true, optional = true }
# Used by ethers (but we need it to be vendored with the lib).
Expand All @@ -34,8 +35,8 @@ rand = { workspace = true, default-features = false, features = ["getrandom"] }
[features]
default = ["std", "evm", "substrate"]
std = []
evm = ["webb-relayer-context/evm"]
substrate = ["webb-relayer-context/substrate", "subxt-signer"]
evm = []
substrate = ["subxt-signer"]

[dev-dependencies]
webb-relayer-config = { workspace = true }
Expand Down
57 changes: 20 additions & 37 deletions crates/tx-queue/src/evm/evm_tx_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,33 @@ use webb::evm::ethers::prelude::TimeLag;
use webb::evm::ethers::providers::Middleware;

use webb::evm::ethers::types;
use webb_relayer_context::RelayerContext;
use webb_relayer_store::queue::{
QueueItemState, QueueStore, TransactionQueueItemKey,
};
use webb_relayer_store::sled::SledQueueKey;
use webb_relayer_utils::clickable_link::ClickableLink;

use super::EvmTxQueueConfig;

/// The TxQueue stores transaction requests so the relayer can process them later.
/// This prevents issues such as creating transactions with the same nonce.
/// Randomized sleep intervals are used to prevent relayers from submitting
/// the same transaction.
#[derive(Clone)]
pub struct TxQueue<S: QueueStore<TypedTransaction>> {
ctx: RelayerContext,
pub struct TxQueue<S, C>
where
S: QueueStore<TypedTransaction>,
C: EvmTxQueueConfig,
{
ctx: C,
chain_id: types::U256,
store: Arc<S>,
}

impl<S> TxQueue<S>
impl<S, C> TxQueue<S, C>
where
S: QueueStore<TypedTransaction, Key = SledQueueKey>,
C: EvmTxQueueConfig,
{
/// Creates a new TxQueue instance.
///
Expand All @@ -55,11 +61,7 @@ where
/// * `ctx` - RelayContext reference that holds the configuration
/// * `chain_id` - The chainId that this queue is for
/// * `store` - [Sled](https://sled.rs)-based database store
pub fn new(
ctx: RelayerContext,
chain_id: types::U256,
store: Arc<S>,
) -> Self {
pub fn new(ctx: C, chain_id: types::U256, store: Arc<S>) -> Self {
Self {
ctx,
chain_id,
Expand All @@ -71,22 +73,14 @@ where
/// Returns a future that resolves `Ok(())` on success, otherwise returns an error.
#[tracing::instrument(skip_all, fields(chain = %self.chain_id))]
pub async fn run(self) -> webb_relayer_utils::Result<()> {
let provider = self.ctx.evm_provider(&self.chain_id).await?;
let wallet = self.ctx.evm_wallet(self.chain_id).await?;
let provider = self.ctx.get_evm_provider(&self.chain_id).await?;
let wallet = self.ctx.get_evm_wallet(&self.chain_id).await?;
let signer_client = SignerMiddleware::new(provider, wallet);

let chain_config = self
.ctx
.config
.evm
.get(&self.chain_id.as_u64().to_string())
.ok_or_else(|| webb_relayer_utils::Error::ChainNotFound {
chain_id: self.chain_id.to_string(),
})?;
let block_confirmations =
self.ctx.block_confirmations(&self.chain_id)?;

// TimeLag client
let client =
TimeLag::new(signer_client, chain_config.block_confirmations);
let client = TimeLag::new(signer_client, block_confirmations);
let chain_id = client
.get_chainid()
.map_err(|_| {
Expand All @@ -110,12 +104,11 @@ where
chain_id = %chain_id,
starting = true,
);
let metrics_clone = self.ctx.metrics.clone();
let task = || async {
loop {
let maybe_item = store
.peek_item(SledQueueKey::from_evm_chain_id(chain_id))?;
let maybe_explorer = &chain_config.explorer;
let maybe_explorer = self.ctx.explorer(&self.chain_id)?;
let Some(item) = maybe_item else {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
Expand Down Expand Up @@ -353,13 +346,7 @@ where
} else {
tracing::info!("Tx {} Finalized", tx_hash_string,);
}
let gas_price = receipt.gas_used.unwrap_or_default();
// metrics for transaction processed by evm tx queue
let metrics = metrics_clone.lock().await;
metrics.proposals_processed_tx_queue.inc();
metrics.proposals_processed_evm_tx_queue.inc();
// gas spent metric
metrics.gas_spent.inc_by(gas_price.as_u64() as f64);

tracing::event!(
target: webb_relayer_utils::probe::TARGET,
tracing::Level::DEBUG,
Expand Down Expand Up @@ -460,18 +447,14 @@ where

// sleep for a random amount of time.
let max_sleep_interval =
chain_config.tx_queue.max_sleep_interval;
self.ctx.max_sleep_interval(&self.chain_id)?;
let s =
rand::thread_rng().gen_range(1_000..=max_sleep_interval);
tracing::trace!("next queue round after {} ms", s);
tokio::time::sleep(Duration::from_millis(s)).await;
}
};
// transaction queue backoff metric
let metrics = self.ctx.metrics.lock().await;
metrics.transaction_queue_back_off.inc();
metrics.evm_transaction_queue_back_off.inc();
drop(metrics);

backoff::future::retry::<(), _, _, _, _>(backoff, task).await?;
Ok(())
}
Expand Down
38 changes: 38 additions & 0 deletions crates/tx-queue/src/evm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,43 @@
// limitations under the License.

mod evm_tx_queue;
use std::sync::Arc;

use ethereum_types::U256;
#[doc(hidden)]
pub use evm_tx_queue::*;

use url::Url;
use webb::evm::ethers::{providers::Middleware, signers::LocalWallet};
use webb_relayer_utils::Result;

/// Config trait for EVM tx queue.
#[async_trait::async_trait]
pub trait EvmTxQueueConfig {
type EtherClient: Middleware;
/// Maximum number of milliseconds to wait before dequeuing a transaction from
/// the queue.
fn max_sleep_interval(&self, chain_id: &U256) -> Result<u64>;
/// Block confirmations
fn block_confirmations(&self, chain_id: &U256) -> Result<u8>;
/// Block Explorer for this chain.
///
/// Optional, and only used for printing a clickable links
/// for transactions and contracts.
fn explorer(&self, chain_id: &U256) -> Result<Option<Url>>;
/// Returns a new `EthereumProvider`.
///
/// # Arguments
///
/// * `chain_id` - A string representing the chain id.
async fn get_evm_provider(
&self,
chain_id: &U256,
) -> Result<Arc<Self::EtherClient>>;
/// Returns an EVM wallet.
///
/// # Arguments
///
/// * `chain_id` - A string representing the chain id.
async fn get_evm_wallet(&self, chain_id: &U256) -> Result<LocalWallet>;
}
Loading

0 comments on commit e71db78

Please sign in to comment.