From 5f208f49eb0ebcd76fcfd4e9b13652d6084d7926 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 15:15:21 -0400 Subject: [PATCH 1/7] feat: port cups/retry logic on RuntimeTransport --- crates/common/src/lib.rs | 1 - crates/common/src/provider/alloy.rs | 15 +- crates/common/src/provider/mod.rs | 2 + crates/common/src/provider/policy.rs | 95 +++++++++ .../runtime_transport.rs} | 190 +++++++++++++++++- 5 files changed, 287 insertions(+), 16 deletions(-) create mode 100644 crates/common/src/provider/policy.rs rename crates/common/src/{alloy_runtime_transport.rs => provider/runtime_transport.rs} (56%) diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index a75ffcbb75a0..ef8eb88528c0 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -8,7 +8,6 @@ extern crate self as foundry_common; extern crate tracing; pub mod abi; -pub mod alloy_runtime_transport; pub mod calc; pub mod clap_helpers; pub mod compile; diff --git a/crates/common/src/provider/alloy.rs b/crates/common/src/provider/alloy.rs index a2c29869ab16..71876cb6cbdb 100644 --- a/crates/common/src/provider/alloy.rs +++ b/crates/common/src/provider/alloy.rs @@ -1,7 +1,7 @@ //! Commonly used helpers to construct `Provider`s use crate::{ - alloy_runtime_transport::RuntimeTransportBuilder, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT, + provider::runtime_transport::RuntimeTransportBuilder, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT, }; use alloy_primitives::U256; use alloy_providers::provider::{Provider, TempProvider}; @@ -217,19 +217,22 @@ impl ProviderBuilder { let ProviderBuilder { url, chain: _, - max_retry: _, - timeout_retry: _, - initial_backoff: _, + max_retry, + timeout_retry, + initial_backoff, timeout, - compute_units_per_second: _, + compute_units_per_second, jwt, headers, } = self; let url = url?; - // todo: port alchemy compute units logic? // todo: provider polling interval let transport_builder = RuntimeTransportBuilder::new(url.clone()) + .with_max_rate_limit_retries(max_retry) + .with_max_timeout_retries(timeout_retry) + .with_cups(compute_units_per_second) + .with_initial_backoff(initial_backoff) .with_timeout(timeout) .with_headers(headers) .with_jwt(jwt); diff --git a/crates/common/src/provider/mod.rs b/crates/common/src/provider/mod.rs index e02e7f7aef39..406195f84736 100644 --- a/crates/common/src/provider/mod.rs +++ b/crates/common/src/provider/mod.rs @@ -1,4 +1,6 @@ //! Provider-related instantiation and usage utilities. +pub mod runtime_transport; pub mod alloy; +pub mod policy; pub mod ethers; diff --git a/crates/common/src/provider/policy.rs b/crates/common/src/provider/policy.rs new file mode 100644 index 000000000000..f340c5cc53df --- /dev/null +++ b/crates/common/src/provider/policy.rs @@ -0,0 +1,95 @@ +//! An utility trait for retrying requests based on the error type. See [TransportError]. +use alloy_json_rpc::ErrorPayload; +use alloy_transport::TransportError; +use serde::Deserialize; + +/// [RetryPolicy] defines logic for which [JsonRpcClient::Error] instances should +/// the client retry the request and try to recover from. +pub trait RetryPolicy: Send + Sync + std::fmt::Debug { + /// Whether to retry the request based on the given `error` + fn should_retry(&self, error: &TransportError) -> bool; + + /// Providers may include the `backoff` in the error response directly + fn backoff_hint(&self, error: &TransportError) -> Option; +} + +/// Implements [RetryPolicy] that will retry requests that errored with +/// status code 429 i.e. TOO_MANY_REQUESTS +/// +/// Infura often fails with a `"header not found"` rpc error which is apparently linked to load +/// balancing, which are retried as well. +#[derive(Clone, Debug, Default)] +pub struct RateLimitRetryPolicy; + +impl RetryPolicy for RateLimitRetryPolicy { + fn backoff_hint(&self, error: &TransportError) -> Option { + if let TransportError::ErrorResp(resp) = error { + println!("resp: {:?}", resp); + let data = resp.try_data_as::(); + if let Some(Ok(data)) = data { + // if daily rate limit exceeded, infura returns the requested backoff in the error + // response + let backoff_seconds = &data["rate"]["backoff_seconds"]; + // infura rate limit error + if let Some(seconds) = backoff_seconds.as_u64() { + return Some(std::time::Duration::from_secs(seconds)) + } + if let Some(seconds) = backoff_seconds.as_f64() { + return Some(std::time::Duration::from_secs(seconds as u64 + 1)) + } + } + } + None + } + + fn should_retry(&self, error: &TransportError) -> bool { + match error { + TransportError::Transport(_) => true, + // The transport could not serialize the error itself. The request was malformed from + // the start. + TransportError::SerError(_) => false, + TransportError::DeserError { text, .. } => { + // some providers send invalid JSON RPC in the error case (no `id:u64`), but the + // text should be a `JsonRpcError` + #[derive(Deserialize)] + struct Resp { + error: ErrorPayload, + } + + if let Ok(resp) = serde_json::from_str::(text) { + return should_retry_json_rpc_error(&resp.error) + } + false + } + TransportError::ErrorResp(err) => should_retry_json_rpc_error(err), + } + } +} + +/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the +/// error code or the message. +fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool { + let ErrorPayload { code, message, .. } = error; + // alchemy throws it this way + if *code == 429 { + return true + } + + // This is an infura error code for `exceeded project rate limit` + if *code == -32005 { + return true + } + + // alternative alchemy error for specific IPs + if *code == -32016 && message.contains("rate limit") { + return true + } + + match message.as_str() { + // this is commonly thrown by infura and is apparently a load balancer issue, see also + "header not found" => true, + // also thrown by infura if out of budget for the day and ratelimited + "daily request count exceeded, request rate limited" => true, + _ => false, + } +} \ No newline at end of file diff --git a/crates/common/src/alloy_runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs similarity index 56% rename from crates/common/src/alloy_runtime_transport.rs rename to crates/common/src/provider/runtime_transport.rs index 22844ccc4a01..4170c45c5f7e 100644 --- a/crates/common/src/alloy_runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -1,5 +1,7 @@ //! Runtime transport that connects on first request, which can take either of an HTTP, -//! WebSocket, or IPC transport. +//! WebSocket, or IPC transport and supports retries based on CUPS logic. +use crate::{ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; +use super::policy::{RateLimitRetryPolicy, RetryPolicy}; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_pubsub::{PubSubConnect, PubSubFrontend}; use alloy_transport::{ @@ -10,7 +12,14 @@ use alloy_transport_ipc::IpcConnect; use alloy_transport_ws::WsConnect; use ethers_providers::{JwtAuth, JwtKey}; use reqwest::header::{HeaderName, HeaderValue}; -use std::{path::PathBuf, str::FromStr, sync::Arc}; +use std::{ + path::PathBuf, + str::FromStr, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use thiserror::Error; use tokio::sync::RwLock; use tower::Service; @@ -64,6 +73,7 @@ pub enum RuntimeTransportError { /// A runtime transport is a custom [alloy_transport::Transport] that only connects when the *first* /// request is made. When the first request is made, it will connect to the runtime using either an /// HTTP WebSocket, or IPC transport depending on the URL used. +/// It also supports retries for rate-limiting and timeout-related errors. #[derive(Clone, Debug, Error)] pub struct RuntimeTransport { /// The inner actual transport used. @@ -76,6 +86,18 @@ pub struct RuntimeTransport { jwt: Option, /// The timeout for requests. timeout: std::time::Duration, + /// The current amount of requests being retried. + requests_enqueued: Arc, + /// The policy that will be used to determine whether to retry a request or not. + policy: RateLimitRetryPolicy, + /// Max amount of retries. + max_rate_limit_retries: u32, + /// Max amount of timeout retries + max_timeout_retries: u32, + /// Initial backoff + initial_backoff: u64, + /// Compute units per second + compute_units_per_second: u64, } /// A builder for [RuntimeTransport]. @@ -84,12 +106,25 @@ pub struct RuntimeTransportBuilder { headers: Vec, jwt: Option, timeout: std::time::Duration, + max_rate_limit_retries: u32, + max_timeout_retries: u32, + initial_backoff: u64, + compute_units_per_second: u64, } impl RuntimeTransportBuilder { /// Create a new builder with the given URL. pub fn new(url: Url) -> Self { - Self { url, headers: vec![], jwt: None, timeout: std::time::Duration::from_secs(30) } + Self { + url, + headers: vec![], + jwt: None, + timeout: REQUEST_TIMEOUT, + max_rate_limit_retries: 8, + max_timeout_retries: 8, + initial_backoff: 800, + compute_units_per_second: ALCHEMY_FREE_TIER_CUPS, + } } /// Set the URL for the transport. @@ -110,6 +145,30 @@ impl RuntimeTransportBuilder { self } + /// Set the max rate-limit retries for the transport. + pub fn with_max_rate_limit_retries(mut self, max_retries: u32) -> Self { + self.max_rate_limit_retries = max_retries; + self + } + + /// Set the max timeout retries for the transport. + pub fn with_max_timeout_retries(mut self, max_retries: u32) -> Self { + self.max_timeout_retries = max_retries; + self + } + + /// Set the initial backoff for the transport. + pub fn with_initial_backoff(mut self, initial_backoff: u64) -> Self { + self.initial_backoff = initial_backoff; + self + } + + /// Set the compute units per second for the transport. + pub fn with_cups(mut self, cups: u64) -> Self { + self.compute_units_per_second = cups; + self + } + /// Builds the [RuntimeTransport] and returns it in a disconnected state. /// The runtime transport will then connect when the first request happens. pub fn build(self) -> RuntimeTransport { @@ -119,6 +178,12 @@ impl RuntimeTransportBuilder { headers: self.headers, jwt: self.jwt, timeout: self.timeout, + requests_enqueued: Arc::new(AtomicU32::new(0)), + policy: RateLimitRetryPolicy::default(), + max_rate_limit_retries: self.max_rate_limit_retries, + max_timeout_retries: self.max_timeout_retries, + initial_backoff: self.initial_backoff, + compute_units_per_second: self.compute_units_per_second, } } } @@ -136,8 +201,24 @@ impl RuntimeTransport { headers: Vec, jwt: Option, timeout: std::time::Duration, + max_rate_limit_retries: u32, + max_timeout_retries: u32, + compute_units_per_second: u64, + initial_backoff: u64, ) -> Self { - Self { inner: Arc::new(RwLock::new(None)), url, headers, jwt, timeout } + Self { + inner: Arc::new(RwLock::new(None)), + url, + headers, + jwt, + timeout, + max_rate_limit_retries, + max_timeout_retries, + requests_enqueued: Arc::new(AtomicU32::new(0)), + policy: RateLimitRetryPolicy::default(), + initial_backoff, + compute_units_per_second, + } } /// Connects the underlying transport, depending on the URL scheme. @@ -210,7 +291,9 @@ impl RuntimeTransport { /// Sends a request using the underlying transport. /// If this is the first request, it will connect to the appropiate transport depending on the - /// URL scheme. For sending the actual request, this action is delegated down to the + /// URL scheme. When sending the request, retries will be automatically handled depending + /// on the parameters set on the [RuntimeTransport]. + /// For sending the actual request, this action is delegated down to the /// underlying transport through Tower's call. See tower's [tower::Service] trait for more /// information. pub fn request(&self, req: RequestPacket) -> TransportFut<'static> { @@ -225,10 +308,74 @@ impl RuntimeTransport { // SAFETY: We just checked that the inner transport exists. let inner_mut = inner.as_mut().expect("We should have an inner transport."); - match inner_mut { - InnerTransport::Http(http) => http.call(req).await, - InnerTransport::Ws(ws) => ws.call(req).await, - InnerTransport::Ipc(ipc) => ipc.call(req).await, + let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; + + let mut rate_limit_retry_number: u32 = 0; + let mut timeout_retries: u32 = 0; + + loop { + let err; + let fut = match inner_mut { + InnerTransport::Http(http) => http.call(req.clone()), + InnerTransport::Ws(ws) => ws.call(req.clone()), + InnerTransport::Ipc(ipc) => ipc.call(req.clone()), + } + .await; + + match fut { + Ok(res) => { + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Ok(res) + } + Err(e) => err = e, + } + + let err = TransportError::from(err); + let should_retry = this.policy.should_retry(&err); + if should_retry { + rate_limit_retry_number += 1; + if rate_limit_retry_number > this.max_rate_limit_retries { + return Err(TransportErrorKind::custom_str("Max retries exceeded")) + } + + let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; + + // try to extract the requested backoff from the error or compute the next + // backoff based on retry count + let mut next_backoff = this.policy.backoff_hint(&err).unwrap_or_else(|| { + std::time::Duration::from_millis(this.initial_backoff as u64) + }); + + // requests are usually weighted and can vary from 10 CU to several 100 CU, + // cheaper requests are more common some example alchemy + // weights: + // - `eth_getStorageAt`: 17 + // - `eth_getBlockByNumber`: 16 + // - `eth_newFilter`: 20 + // + // (coming from forking mode) assuming here that storage request will be the + // driver for Rate limits we choose `17` as the average cost + // of any request + const AVG_COST: u64 = 17u64; + let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs( + AVG_COST, + this.compute_units_per_second, + current_queued_reqs, + ahead_in_queue, + ); + next_backoff += + std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); + + tokio::time::sleep(next_backoff).await; + } else { + if timeout_retries < this.max_timeout_retries { + timeout_retries += 1; + continue; + } + + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Err(TransportErrorKind::custom_str("Max retries exceeded")) + } } }) } @@ -312,3 +459,28 @@ fn url_to_file_path(url: &Url) -> Result { fn url_to_file_path(url: &Url) -> Result { url.to_file_path() } + +/// Calculates an offset in seconds by taking into account the number of currently queued requests, +/// number of requests that were ahead in the queue when the request was first issued, the average +/// cost a weighted request (heuristic), and the number of available compute units per seconds. +/// +/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request +/// is supposed to wait to not get rate limited. The budget per second is +/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory) +/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited. +/// By taking into account the number of concurrent request and the position in queue when the +/// request was first issued and determine the number of seconds a request is supposed to wait, if +/// at all +fn compute_unit_offset_in_secs( + avg_cost: u64, + compute_units_per_second: u64, + current_queued_requests: u64, + ahead_in_queue: u64, +) -> u64 { + let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost); + if current_queued_requests > request_capacity_per_second { + current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second) + } else { + 0 + } +} \ No newline at end of file From 92996244a3596285f4863f606a4023849da274a1 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 15:24:21 -0400 Subject: [PATCH 2/7] clippy/fmt --- crates/common/src/provider/mod.rs | 4 ++-- crates/common/src/provider/policy.rs | 2 +- crates/common/src/provider/runtime_transport.rs | 16 +++++++++------- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/common/src/provider/mod.rs b/crates/common/src/provider/mod.rs index 406195f84736..0a72ae2b874f 100644 --- a/crates/common/src/provider/mod.rs +++ b/crates/common/src/provider/mod.rs @@ -1,6 +1,6 @@ //! Provider-related instantiation and usage utilities. -pub mod runtime_transport; pub mod alloy; -pub mod policy; pub mod ethers; +pub mod policy; +pub mod runtime_transport; diff --git a/crates/common/src/provider/policy.rs b/crates/common/src/provider/policy.rs index f340c5cc53df..59bd2c2e80a2 100644 --- a/crates/common/src/provider/policy.rs +++ b/crates/common/src/provider/policy.rs @@ -92,4 +92,4 @@ fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool { "daily request count exceeded, request rate limited" => true, _ => false, } -} \ No newline at end of file +} diff --git a/crates/common/src/provider/runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs index 4170c45c5f7e..1f5ca48ecfce 100644 --- a/crates/common/src/provider/runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -1,7 +1,7 @@ //! Runtime transport that connects on first request, which can take either of an HTTP, //! WebSocket, or IPC transport and supports retries based on CUPS logic. -use crate::{ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; use super::policy::{RateLimitRetryPolicy, RetryPolicy}; +use crate::{ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_pubsub::{PubSubConnect, PubSubFrontend}; use alloy_transport::{ @@ -179,7 +179,7 @@ impl RuntimeTransportBuilder { jwt: self.jwt, timeout: self.timeout, requests_enqueued: Arc::new(AtomicU32::new(0)), - policy: RateLimitRetryPolicy::default(), + policy: RateLimitRetryPolicy, max_rate_limit_retries: self.max_rate_limit_retries, max_timeout_retries: self.max_timeout_retries, initial_backoff: self.initial_backoff, @@ -196,6 +196,7 @@ impl ::core::fmt::Display for RuntimeTransport { impl RuntimeTransport { /// Create a new [RuntimeTransport]. + #[allow(clippy::too_many_arguments)] pub fn new( url: Url, headers: Vec, @@ -215,7 +216,7 @@ impl RuntimeTransport { max_rate_limit_retries, max_timeout_retries, requests_enqueued: Arc::new(AtomicU32::new(0)), - policy: RateLimitRetryPolicy::default(), + policy: RateLimitRetryPolicy, initial_backoff, compute_units_per_second, } @@ -342,9 +343,10 @@ impl RuntimeTransport { // try to extract the requested backoff from the error or compute the next // backoff based on retry count - let mut next_backoff = this.policy.backoff_hint(&err).unwrap_or_else(|| { - std::time::Duration::from_millis(this.initial_backoff as u64) - }); + let mut next_backoff = this + .policy + .backoff_hint(&err) + .unwrap_or_else(|| std::time::Duration::from_millis(this.initial_backoff)); // requests are usually weighted and can vary from 10 CU to several 100 CU, // cheaper requests are more common some example alchemy @@ -483,4 +485,4 @@ fn compute_unit_offset_in_secs( } else { 0 } -} \ No newline at end of file +} From 6570f66d1800c8fd6847270d85c308b91fb933d8 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 16:07:37 -0400 Subject: [PATCH 3/7] fmt --- crates/common/src/provider/runtime_transport.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/common/src/provider/runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs index 1f5ca48ecfce..545b1d2bebc7 100644 --- a/crates/common/src/provider/runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -295,8 +295,8 @@ impl RuntimeTransport { /// URL scheme. When sending the request, retries will be automatically handled depending /// on the parameters set on the [RuntimeTransport]. /// For sending the actual request, this action is delegated down to the - /// underlying transport through Tower's call. See tower's [tower::Service] trait for more - /// information. + /// underlying transport through Tower's [tower::Service::call]. See tower's [tower::Service] + /// trait for more information. pub fn request(&self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); Box::pin(async move { From 7309ecdd8a406889673cbe4d3429de1d27873862 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 16:51:10 -0400 Subject: [PATCH 4/7] chore: remove new --- .../common/src/provider/runtime_transport.rs | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/crates/common/src/provider/runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs index 545b1d2bebc7..9f834516bdb9 100644 --- a/crates/common/src/provider/runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -195,33 +195,6 @@ impl ::core::fmt::Display for RuntimeTransport { } impl RuntimeTransport { - /// Create a new [RuntimeTransport]. - #[allow(clippy::too_many_arguments)] - pub fn new( - url: Url, - headers: Vec, - jwt: Option, - timeout: std::time::Duration, - max_rate_limit_retries: u32, - max_timeout_retries: u32, - compute_units_per_second: u64, - initial_backoff: u64, - ) -> Self { - Self { - inner: Arc::new(RwLock::new(None)), - url, - headers, - jwt, - timeout, - max_rate_limit_retries, - max_timeout_retries, - requests_enqueued: Arc::new(AtomicU32::new(0)), - policy: RateLimitRetryPolicy, - initial_backoff, - compute_units_per_second, - } - } - /// Connects the underlying transport, depending on the URL scheme. pub async fn connect(&self) -> Result { match self.url.scheme() { From a318ec121b0ac128fddd9643151d833a9cf70cf9 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 16:57:47 -0400 Subject: [PATCH 5/7] chore: rename to retry --- crates/common/src/provider/mod.rs | 2 +- crates/common/src/provider/{policy.rs => retry.rs} | 0 crates/common/src/provider/runtime_transport.rs | 3 ++- 3 files changed, 3 insertions(+), 2 deletions(-) rename crates/common/src/provider/{policy.rs => retry.rs} (100%) diff --git a/crates/common/src/provider/mod.rs b/crates/common/src/provider/mod.rs index 0a72ae2b874f..68295447ac8c 100644 --- a/crates/common/src/provider/mod.rs +++ b/crates/common/src/provider/mod.rs @@ -2,5 +2,5 @@ pub mod alloy; pub mod ethers; -pub mod policy; +pub mod retry; pub mod runtime_transport; diff --git a/crates/common/src/provider/policy.rs b/crates/common/src/provider/retry.rs similarity index 100% rename from crates/common/src/provider/policy.rs rename to crates/common/src/provider/retry.rs diff --git a/crates/common/src/provider/runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs index 9f834516bdb9..b0ecd61fcf56 100644 --- a/crates/common/src/provider/runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -1,6 +1,6 @@ //! Runtime transport that connects on first request, which can take either of an HTTP, //! WebSocket, or IPC transport and supports retries based on CUPS logic. -use super::policy::{RateLimitRetryPolicy, RetryPolicy}; +use super::retry::{RateLimitRetryPolicy, RetryPolicy}; use crate::{ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_pubsub::{PubSubConnect, PubSubFrontend}; @@ -101,6 +101,7 @@ pub struct RuntimeTransport { } /// A builder for [RuntimeTransport]. +#[derive(Debug)] pub struct RuntimeTransportBuilder { url: Url, headers: Vec, From 583bee1a6ae5b8fc93a5d7b620b6872f0e0dd5f2 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 23:43:09 -0400 Subject: [PATCH 6/7] chore: refactor to tower layer --- Cargo.lock | 21 +- crates/common/Cargo.toml | 1 + crates/common/src/provider/alloy.rs | 23 ++- crates/common/src/provider/mod.rs | 1 + .../common/src/provider/runtime_transport.rs | 168 +-------------- crates/common/src/provider/tower.rs | 192 ++++++++++++++++++ 6 files changed, 228 insertions(+), 178 deletions(-) create mode 100644 crates/common/src/provider/tower.rs diff --git a/Cargo.lock b/Cargo.lock index 808058c3f31e..69f9cb25c24e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,7 +123,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-primitives", "serde", @@ -134,7 +134,7 @@ dependencies = [ [[package]] name = "alloy-networks" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -169,7 +169,7 @@ dependencies = [ [[package]] name = "alloy-providers" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-networks", "alloy-primitives", @@ -187,7 +187,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -226,7 +226,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -243,7 +243,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -300,7 +300,7 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "base64 0.21.5", @@ -316,7 +316,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -329,7 +329,7 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -346,7 +346,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -3192,6 +3192,7 @@ dependencies = [ "alloy-primitives", "alloy-providers", "alloy-pubsub", + "alloy-rpc-client", "alloy-rpc-types", "alloy-sol-types", "alloy-transport", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2c6443703a7c..21f970ccb06f 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -22,6 +22,7 @@ alloy-dyn-abi = { workspace = true, features = ["arbitrary", "eip712"] } alloy-json-abi.workspace = true alloy-primitives = { workspace = true, features = ["serde", "getrandom", "arbitrary", "rlp"] } alloy-rpc-types.workspace = true +alloy-rpc-client.workspace = true alloy-providers.workspace = true alloy-transport.workspace = true alloy-transport-http.workspace = true diff --git a/crates/common/src/provider/alloy.rs b/crates/common/src/provider/alloy.rs index 71876cb6cbdb..bb52778145e9 100644 --- a/crates/common/src/provider/alloy.rs +++ b/crates/common/src/provider/alloy.rs @@ -5,6 +5,7 @@ use crate::{ }; use alloy_primitives::U256; use alloy_providers::provider::{Provider, TempProvider}; +use alloy_rpc_client::ClientBuilder; use alloy_transport::BoxTransport; use ethers_middleware::gas_oracle::{GasCategory, GasOracle, Polygon}; use eyre::{Result, WrapErr}; @@ -17,6 +18,8 @@ use std::{ }; use url::ParseError; +use super::tower::RetryBackoffLayer; + /// Helper type alias for a retry provider pub type RetryProvider = Provider; @@ -227,17 +230,21 @@ impl ProviderBuilder { } = self; let url = url?; - // todo: provider polling interval - let transport_builder = RuntimeTransportBuilder::new(url.clone()) - .with_max_rate_limit_retries(max_retry) - .with_max_timeout_retries(timeout_retry) - .with_cups(compute_units_per_second) - .with_initial_backoff(initial_backoff) + let retry_layer = RetryBackoffLayer::new( + max_retry, + timeout_retry, + initial_backoff, + compute_units_per_second, + ); + let transport = RuntimeTransportBuilder::new(url.clone()) .with_timeout(timeout) .with_headers(headers) - .with_jwt(jwt); + .with_jwt(jwt) + .build(); + let client = ClientBuilder::default().layer(retry_layer).transport(transport, false); - Ok(Provider::new(transport_builder.build().boxed())) + // todo: provider polling interval + Ok(Provider::new_with_client(client.boxed())) } } diff --git a/crates/common/src/provider/mod.rs b/crates/common/src/provider/mod.rs index 68295447ac8c..cbd9ecbd00ef 100644 --- a/crates/common/src/provider/mod.rs +++ b/crates/common/src/provider/mod.rs @@ -4,3 +4,4 @@ pub mod alloy; pub mod ethers; pub mod retry; pub mod runtime_transport; +pub mod tower; diff --git a/crates/common/src/provider/runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs index b0ecd61fcf56..6f7539420fad 100644 --- a/crates/common/src/provider/runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -1,7 +1,6 @@ //! Runtime transport that connects on first request, which can take either of an HTTP, //! WebSocket, or IPC transport and supports retries based on CUPS logic. -use super::retry::{RateLimitRetryPolicy, RetryPolicy}; -use crate::{ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT}; +use crate::REQUEST_TIMEOUT; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_pubsub::{PubSubConnect, PubSubFrontend}; use alloy_transport::{ @@ -12,14 +11,7 @@ use alloy_transport_ipc::IpcConnect; use alloy_transport_ws::WsConnect; use ethers_providers::{JwtAuth, JwtKey}; use reqwest::header::{HeaderName, HeaderValue}; -use std::{ - path::PathBuf, - str::FromStr, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, -}; +use std::{path::PathBuf, str::FromStr, sync::Arc}; use thiserror::Error; use tokio::sync::RwLock; use tower::Service; @@ -86,18 +78,6 @@ pub struct RuntimeTransport { jwt: Option, /// The timeout for requests. timeout: std::time::Duration, - /// The current amount of requests being retried. - requests_enqueued: Arc, - /// The policy that will be used to determine whether to retry a request or not. - policy: RateLimitRetryPolicy, - /// Max amount of retries. - max_rate_limit_retries: u32, - /// Max amount of timeout retries - max_timeout_retries: u32, - /// Initial backoff - initial_backoff: u64, - /// Compute units per second - compute_units_per_second: u64, } /// A builder for [RuntimeTransport]. @@ -107,25 +87,12 @@ pub struct RuntimeTransportBuilder { headers: Vec, jwt: Option, timeout: std::time::Duration, - max_rate_limit_retries: u32, - max_timeout_retries: u32, - initial_backoff: u64, - compute_units_per_second: u64, } impl RuntimeTransportBuilder { /// Create a new builder with the given URL. pub fn new(url: Url) -> Self { - Self { - url, - headers: vec![], - jwt: None, - timeout: REQUEST_TIMEOUT, - max_rate_limit_retries: 8, - max_timeout_retries: 8, - initial_backoff: 800, - compute_units_per_second: ALCHEMY_FREE_TIER_CUPS, - } + Self { url, headers: vec![], jwt: None, timeout: REQUEST_TIMEOUT } } /// Set the URL for the transport. @@ -146,30 +113,6 @@ impl RuntimeTransportBuilder { self } - /// Set the max rate-limit retries for the transport. - pub fn with_max_rate_limit_retries(mut self, max_retries: u32) -> Self { - self.max_rate_limit_retries = max_retries; - self - } - - /// Set the max timeout retries for the transport. - pub fn with_max_timeout_retries(mut self, max_retries: u32) -> Self { - self.max_timeout_retries = max_retries; - self - } - - /// Set the initial backoff for the transport. - pub fn with_initial_backoff(mut self, initial_backoff: u64) -> Self { - self.initial_backoff = initial_backoff; - self - } - - /// Set the compute units per second for the transport. - pub fn with_cups(mut self, cups: u64) -> Self { - self.compute_units_per_second = cups; - self - } - /// Builds the [RuntimeTransport] and returns it in a disconnected state. /// The runtime transport will then connect when the first request happens. pub fn build(self) -> RuntimeTransport { @@ -179,12 +122,6 @@ impl RuntimeTransportBuilder { headers: self.headers, jwt: self.jwt, timeout: self.timeout, - requests_enqueued: Arc::new(AtomicU32::new(0)), - policy: RateLimitRetryPolicy, - max_rate_limit_retries: self.max_rate_limit_retries, - max_timeout_retries: self.max_timeout_retries, - initial_backoff: self.initial_backoff, - compute_units_per_second: self.compute_units_per_second, } } } @@ -283,76 +220,12 @@ impl RuntimeTransport { // SAFETY: We just checked that the inner transport exists. let inner_mut = inner.as_mut().expect("We should have an inner transport."); - let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; - - let mut rate_limit_retry_number: u32 = 0; - let mut timeout_retries: u32 = 0; - - loop { - let err; - let fut = match inner_mut { - InnerTransport::Http(http) => http.call(req.clone()), - InnerTransport::Ws(ws) => ws.call(req.clone()), - InnerTransport::Ipc(ipc) => ipc.call(req.clone()), - } - .await; - - match fut { - Ok(res) => { - this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); - return Ok(res) - } - Err(e) => err = e, - } - - let err = TransportError::from(err); - let should_retry = this.policy.should_retry(&err); - if should_retry { - rate_limit_retry_number += 1; - if rate_limit_retry_number > this.max_rate_limit_retries { - return Err(TransportErrorKind::custom_str("Max retries exceeded")) - } - - let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; - - // try to extract the requested backoff from the error or compute the next - // backoff based on retry count - let mut next_backoff = this - .policy - .backoff_hint(&err) - .unwrap_or_else(|| std::time::Duration::from_millis(this.initial_backoff)); - - // requests are usually weighted and can vary from 10 CU to several 100 CU, - // cheaper requests are more common some example alchemy - // weights: - // - `eth_getStorageAt`: 17 - // - `eth_getBlockByNumber`: 16 - // - `eth_newFilter`: 20 - // - // (coming from forking mode) assuming here that storage request will be the - // driver for Rate limits we choose `17` as the average cost - // of any request - const AVG_COST: u64 = 17u64; - let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs( - AVG_COST, - this.compute_units_per_second, - current_queued_reqs, - ahead_in_queue, - ); - next_backoff += - std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); - - tokio::time::sleep(next_backoff).await; - } else { - if timeout_retries < this.max_timeout_retries { - timeout_retries += 1; - continue; - } - - this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); - return Err(TransportErrorKind::custom_str("Max retries exceeded")) - } + match inner_mut { + InnerTransport::Http(http) => http.call(req.clone()), + InnerTransport::Ws(ws) => ws.call(req.clone()), + InnerTransport::Ipc(ipc) => ipc.call(req.clone()), } + .await }) } @@ -435,28 +308,3 @@ fn url_to_file_path(url: &Url) -> Result { fn url_to_file_path(url: &Url) -> Result { url.to_file_path() } - -/// Calculates an offset in seconds by taking into account the number of currently queued requests, -/// number of requests that were ahead in the queue when the request was first issued, the average -/// cost a weighted request (heuristic), and the number of available compute units per seconds. -/// -/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request -/// is supposed to wait to not get rate limited. The budget per second is -/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory) -/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited. -/// By taking into account the number of concurrent request and the position in queue when the -/// request was first issued and determine the number of seconds a request is supposed to wait, if -/// at all -fn compute_unit_offset_in_secs( - avg_cost: u64, - compute_units_per_second: u64, - current_queued_requests: u64, - ahead_in_queue: u64, -) -> u64 { - let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost); - if current_queued_requests > request_capacity_per_second { - current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second) - } else { - 0 - } -} diff --git a/crates/common/src/provider/tower.rs b/crates/common/src/provider/tower.rs new file mode 100644 index 000000000000..5e08b836c991 --- /dev/null +++ b/crates/common/src/provider/tower.rs @@ -0,0 +1,192 @@ +//! Alloy-related tower middleware for retrying rate-limited requests +//! and applying backoff. +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::{TransportError, TransportErrorKind, TransportFut}; +use tower::Service; + +use super::{ + retry::{RateLimitRetryPolicy, RetryPolicy}, + runtime_transport::RuntimeTransport, +}; + +/// An Alloy Tower Layer that is responsible for retrying requests based on the +/// error type. See [TransportError]. +#[derive(Debug, Clone)] +pub struct RetryBackoffLayer { + /// The maximum number of retries for rate limit errors + max_rate_limit_retries: u32, + /// The maximum number of retries for timeout errors + max_timeout_retries: u32, + /// The initial backoff in milliseconds + initial_backoff: u64, + /// The number of compute units per second for this provider + compute_units_per_second: u64, +} + +impl RetryBackoffLayer { + /// Creates a new [RetryWithPolicyLayer] with the given parameters + pub fn new( + max_rate_limit_retries: u32, + max_timeout_retries: u32, + initial_backoff: u64, + compute_units_per_second: u64, + ) -> Self { + Self { + max_rate_limit_retries, + max_timeout_retries, + initial_backoff, + compute_units_per_second, + } + } +} + +impl tower::layer::Layer for RetryBackoffLayer { + type Service = RetryBackoffService; + + fn layer(&self, inner: S) -> Self::Service { + RetryBackoffService { + inner, + policy: RateLimitRetryPolicy, + max_rate_limit_retries: self.max_rate_limit_retries, + max_timeout_retries: self.max_timeout_retries, + initial_backoff: self.initial_backoff, + compute_units_per_second: self.compute_units_per_second, + requests_enqueued: Arc::new(AtomicU32::new(0)), + } + } +} + +/// An Alloy Tower Service that is responsible for retrying requests based on the +/// error type. See [TransportError] and [RetryWithPolicyLayer]. +#[derive(Debug, Clone)] +pub struct RetryBackoffService { + /// The inner service + inner: S, + /// The retry policy + policy: RateLimitRetryPolicy, + /// The maximum number of retries for rate limit errors + max_rate_limit_retries: u32, + /// The maximum number of retries for timeout errors + max_timeout_retries: u32, + /// The initial backoff in milliseconds + initial_backoff: u64, + /// The number of compute units per second for this service + compute_units_per_second: u64, + /// The number of requests currently enqueued + requests_enqueued: Arc, +} + +// impl tower service +impl Service for RetryBackoffService { + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Our middleware doesn't care about backpressure, so it's ready as long + // as the inner service is ready. + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let mut this = self.clone(); + Box::pin(async move { + let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; + let mut rate_limit_retry_number: u32 = 0; + let mut timeout_retries: u32 = 0; + loop { + let err; + let fut = this.inner.call(request.clone()).await; + + match fut { + Ok(res) => { + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Ok(res) + } + Err(e) => err = e, + } + + let err = TransportError::from(err); + let should_retry = this.policy.should_retry(&err); + if should_retry { + rate_limit_retry_number += 1; + if rate_limit_retry_number > this.max_rate_limit_retries { + return Err(TransportErrorKind::custom_str("Max retries exceeded")) + } + + let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; + + // try to extract the requested backoff from the error or compute the next + // backoff based on retry count + let mut next_backoff = this + .policy + .backoff_hint(&err) + .unwrap_or_else(|| std::time::Duration::from_millis(this.initial_backoff)); + + // requests are usually weighted and can vary from 10 CU to several 100 CU, + // cheaper requests are more common some example alchemy + // weights: + // - `eth_getStorageAt`: 17 + // - `eth_getBlockByNumber`: 16 + // - `eth_newFilter`: 20 + // + // (coming from forking mode) assuming here that storage request will be the + // driver for Rate limits we choose `17` as the average cost + // of any request + const AVG_COST: u64 = 17u64; + let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs( + AVG_COST, + this.compute_units_per_second, + current_queued_reqs, + ahead_in_queue, + ); + next_backoff += + std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); + + tokio::time::sleep(next_backoff).await; + } else { + if timeout_retries < this.max_timeout_retries { + timeout_retries += 1; + continue; + } + + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Err(TransportErrorKind::custom_str("Max retries exceeded")) + } + } + }) + } +} + +/// Calculates an offset in seconds by taking into account the number of currently queued requests, +/// number of requests that were ahead in the queue when the request was first issued, the average +/// cost a weighted request (heuristic), and the number of available compute units per seconds. +/// +/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request +/// is supposed to wait to not get rate limited. The budget per second is +/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory) +/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited. +/// By taking into account the number of concurrent request and the position in queue when the +/// request was first issued and determine the number of seconds a request is supposed to wait, if +/// at all +fn compute_unit_offset_in_secs( + avg_cost: u64, + compute_units_per_second: u64, + current_queued_requests: u64, + ahead_in_queue: u64, +) -> u64 { + let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost); + if current_queued_requests > request_capacity_per_second { + current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second) + } else { + 0 + } +} From 2ebcb7b6b5c4f4c6e0d608646287088b7f2c8913 Mon Sep 17 00:00:00 2001 From: Enrique Ortiz Date: Wed, 13 Dec 2023 23:45:48 -0400 Subject: [PATCH 7/7] chore: no unneeded clone --- crates/common/src/provider/runtime_transport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/common/src/provider/runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs index 6f7539420fad..67f83998d44c 100644 --- a/crates/common/src/provider/runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -221,9 +221,9 @@ impl RuntimeTransport { let inner_mut = inner.as_mut().expect("We should have an inner transport."); match inner_mut { - InnerTransport::Http(http) => http.call(req.clone()), - InnerTransport::Ws(ws) => ws.call(req.clone()), - InnerTransport::Ipc(ipc) => ipc.call(req.clone()), + InnerTransport::Http(http) => http.call(req), + InnerTransport::Ws(ws) => ws.call(req), + InnerTransport::Ipc(ipc) => ipc.call(req), } .await })