Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(RuntimeTransport): port cups/retry logic #6594

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ alloy-dyn-abi = { workspace = true, features = ["arbitrary", "eip712"] }
alloy-json-abi.workspace = true
alloy-primitives = { workspace = true, features = ["serde", "getrandom", "arbitrary", "rlp"] }
alloy-rpc-types.workspace = true
alloy-rpc-client.workspace = true
alloy-providers.workspace = true
alloy-transport.workspace = true
alloy-transport-http.workspace = true
Expand Down
1 change: 0 additions & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ extern crate self as foundry_common;
extern crate tracing;

pub mod abi;
pub mod alloy_runtime_transport;
pub mod calc;
pub mod clap_helpers;
pub mod compile;
Expand Down
30 changes: 20 additions & 10 deletions crates/common/src/provider/alloy.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Commonly used helpers to construct `Provider`s

use crate::{
alloy_runtime_transport::RuntimeTransportBuilder, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT,
provider::runtime_transport::RuntimeTransportBuilder, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT,
};
use alloy_primitives::U256;
use alloy_providers::provider::{Provider, TempProvider};
use alloy_rpc_client::ClientBuilder;
use alloy_transport::BoxTransport;
use ethers_middleware::gas_oracle::{GasCategory, GasOracle, Polygon};
use eyre::{Result, WrapErr};
Expand All @@ -17,6 +18,8 @@ use std::{
};
use url::ParseError;

use super::tower::RetryBackoffLayer;

/// Helper type alias for a retry provider
pub type RetryProvider = Provider<BoxTransport>;

Expand Down Expand Up @@ -217,24 +220,31 @@ impl ProviderBuilder {
let ProviderBuilder {
url,
chain: _,
max_retry: _,
timeout_retry: _,
initial_backoff: _,
max_retry,
timeout_retry,
initial_backoff,
timeout,
compute_units_per_second: _,
compute_units_per_second,
jwt,
headers,
} = self;
let url = url?;

// todo: port alchemy compute units logic?
// todo: provider polling interval
let transport_builder = RuntimeTransportBuilder::new(url.clone())
let retry_layer = RetryBackoffLayer::new(
max_retry,
timeout_retry,
initial_backoff,
compute_units_per_second,
);
let transport = RuntimeTransportBuilder::new(url.clone())
.with_timeout(timeout)
.with_headers(headers)
.with_jwt(jwt);
.with_jwt(jwt)
.build();
let client = ClientBuilder::default().layer(retry_layer).transport(transport, false);

Ok(Provider::new(transport_builder.build().boxed()))
// todo: provider polling interval
Ok(Provider::new_with_client(client.boxed()))
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@

pub mod alloy;
pub mod ethers;
pub mod retry;
pub mod runtime_transport;
pub mod tower;
95 changes: 95 additions & 0 deletions crates/common/src/provider/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//! An utility trait for retrying requests based on the error type. See [TransportError].
use alloy_json_rpc::ErrorPayload;
use alloy_transport::TransportError;
use serde::Deserialize;

/// [RetryPolicy] defines logic for which [JsonRpcClient::Error] instances should
/// the client retry the request and try to recover from.
pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
/// Whether to retry the request based on the given `error`
fn should_retry(&self, error: &TransportError) -> bool;

/// Providers may include the `backoff` in the error response directly
fn backoff_hint(&self, error: &TransportError) -> Option<std::time::Duration>;
}

/// Implements [RetryPolicy] that will retry requests that errored with
/// status code 429 i.e. TOO_MANY_REQUESTS
///
/// Infura often fails with a `"header not found"` rpc error which is apparently linked to load
/// balancing, which are retried as well.
#[derive(Clone, Debug, Default)]
pub struct RateLimitRetryPolicy;

impl RetryPolicy for RateLimitRetryPolicy {
fn backoff_hint(&self, error: &TransportError) -> Option<std::time::Duration> {
if let TransportError::ErrorResp(resp) = error {
println!("resp: {:?}", resp);
let data = resp.try_data_as::<serde_json::Value>();
if let Some(Ok(data)) = data {
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let backoff_seconds = &data["rate"]["backoff_seconds"];
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(std::time::Duration::from_secs(seconds))
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(std::time::Duration::from_secs(seconds as u64 + 1))
}
}
}
None
}

fn should_retry(&self, error: &TransportError) -> bool {
match error {
TransportError::Transport(_) => true,
// The transport could not serialize the error itself. The request was malformed from
// the start.
TransportError::SerError(_) => false,
TransportError::DeserError { text, .. } => {
// some providers send invalid JSON RPC in the error case (no `id:u64`), but the
// text should be a `JsonRpcError`
#[derive(Deserialize)]
struct Resp {
error: ErrorPayload,
}

if let Ok(resp) = serde_json::from_str::<Resp>(text) {
return should_retry_json_rpc_error(&resp.error)
}
false
}
TransportError::ErrorResp(err) => should_retry_json_rpc_error(err),
}
}
}

/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the
/// error code or the message.
fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool {
let ErrorPayload { code, message, .. } = error;
// alchemy throws it this way
if *code == 429 {
return true
}

// This is an infura error code for `exceeded project rate limit`
if *code == -32005 {
return true
}

// alternative alchemy error for specific IPs
if *code == -32016 && message.contains("rate limit") {
return true
}

match message.as_str() {
// this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
"header not found" => true,
// also thrown by infura if out of budget for the day and ratelimited
"daily request count exceeded, request rate limited" => true,
_ => false,
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Runtime transport that connects on first request, which can take either of an HTTP,
//! WebSocket, or IPC transport.
//! WebSocket, or IPC transport and supports retries based on CUPS logic.
use crate::REQUEST_TIMEOUT;
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_pubsub::{PubSubConnect, PubSubFrontend};
use alloy_transport::{
Expand Down Expand Up @@ -64,6 +65,7 @@ pub enum RuntimeTransportError {
/// A runtime transport is a custom [alloy_transport::Transport] that only connects when the *first*
/// request is made. When the first request is made, it will connect to the runtime using either an
/// HTTP WebSocket, or IPC transport depending on the URL used.
/// It also supports retries for rate-limiting and timeout-related errors.
#[derive(Clone, Debug, Error)]
pub struct RuntimeTransport {
Evalir marked this conversation as resolved.
Show resolved Hide resolved
/// The inner actual transport used.
Expand All @@ -79,6 +81,7 @@ pub struct RuntimeTransport {
}

/// A builder for [RuntimeTransport].
#[derive(Debug)]
pub struct RuntimeTransportBuilder {
url: Url,
headers: Vec<String>,
Expand All @@ -89,7 +92,7 @@ pub struct RuntimeTransportBuilder {
impl RuntimeTransportBuilder {
/// Create a new builder with the given URL.
pub fn new(url: Url) -> Self {
Self { url, headers: vec![], jwt: None, timeout: std::time::Duration::from_secs(30) }
Self { url, headers: vec![], jwt: None, timeout: REQUEST_TIMEOUT }
}

/// Set the URL for the transport.
Expand Down Expand Up @@ -130,16 +133,6 @@ impl ::core::fmt::Display for RuntimeTransport {
}

impl RuntimeTransport {
/// Create a new [RuntimeTransport].
pub fn new(
url: Url,
headers: Vec<String>,
jwt: Option<String>,
timeout: std::time::Duration,
) -> Self {
Self { inner: Arc::new(RwLock::new(None)), url, headers, jwt, timeout }
}

/// Connects the underlying transport, depending on the URL scheme.
pub async fn connect(&self) -> Result<InnerTransport, RuntimeTransportError> {
match self.url.scheme() {
Expand Down Expand Up @@ -210,9 +203,11 @@ impl RuntimeTransport {

/// Sends a request using the underlying transport.
/// If this is the first request, it will connect to the appropiate transport depending on the
/// URL scheme. For sending the actual request, this action is delegated down to the
/// underlying transport through Tower's call. See tower's [tower::Service] trait for more
/// information.
/// URL scheme. When sending the request, retries will be automatically handled depending
/// on the parameters set on the [RuntimeTransport].
/// For sending the actual request, this action is delegated down to the
/// underlying transport through Tower's [tower::Service::call]. See tower's [tower::Service]
/// trait for more information.
pub fn request(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
Box::pin(async move {
Expand All @@ -226,10 +221,11 @@ impl RuntimeTransport {
let inner_mut = inner.as_mut().expect("We should have an inner transport.");

match inner_mut {
InnerTransport::Http(http) => http.call(req).await,
InnerTransport::Ws(ws) => ws.call(req).await,
InnerTransport::Ipc(ipc) => ipc.call(req).await,
InnerTransport::Http(http) => http.call(req),
InnerTransport::Ws(ws) => ws.call(req),
InnerTransport::Ipc(ipc) => ipc.call(req),
}
.await
})
}

Expand Down
Loading