Skip to content

Commit

Permalink
refactor(rpc): add builder pattern for EthHandlers (#9035)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcoratger authored Jun 24, 2024
1 parent 97de9d2 commit 8f2522e
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 84 deletions.
212 changes: 208 additions & 4 deletions crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
use crate::RpcModuleConfig;
use reth_evm::ConfigureEvm;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{
AccountReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
EvmEnvProvider, StateProviderFactory,
};
use reth_rpc::{
eth::{
cache::{EthStateCache, EthStateCacheConfig},
gas_oracle::GasPriceOracleConfig,
EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP,
cache::{cache_new_blocks_task, EthStateCache, EthStateCacheConfig},
fee_history_cache_new_blocks_task,
gas_oracle::{GasPriceOracle, GasPriceOracleConfig},
traits::RawTransactionForwarder,
EthFilterConfig, FeeHistoryCache, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP,
},
EthApi, EthFilter, EthPubSub,
};
use reth_rpc_server_types::constants::{
default_max_tracing_requests, DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE,
};
use reth_tasks::pool::BlockingTaskPool;
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner};
use reth_transaction_pool::TransactionPool;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

/// All handlers for the `eth` namespace
#[derive(Debug, Clone)]
Expand All @@ -27,6 +38,199 @@ pub struct EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
pub blocking_task_pool: BlockingTaskPool,
}

/// Configuration for `EthHandlersBuilder`
#[derive(Clone, Debug)]
pub(crate) struct EthHandlersConfig<Provider, Pool, Network, Tasks, Events, EvmConfig> {
/// The provider for blockchain data, responsible for reading blocks, accounts, state, etc.
pub(crate) provider: Provider,
/// The transaction pool for managing pending transactions.
pub(crate) pool: Pool,
/// The network information, handling peer connections and network state.
pub(crate) network: Network,
/// The task executor for spawning asynchronous tasks.
pub(crate) executor: Tasks,
/// The event subscriptions for canonical state changes.
pub(crate) events: Events,
/// The EVM configuration for Ethereum Virtual Machine settings.
pub(crate) evm_config: EvmConfig,
/// An optional forwarder for raw transactions.
pub(crate) eth_raw_transaction_forwarder: Option<Arc<dyn RawTransactionForwarder>>,
}

/// Represents the builder for the `EthHandlers` struct, used to configure and create instances of
/// `EthHandlers`.
#[derive(Debug, Clone)]
pub(crate) struct EthHandlersBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig> {
eth_handlers_config: EthHandlersConfig<Provider, Pool, Network, Tasks, Events, EvmConfig>,
/// Configuration for the RPC module
rpc_config: RpcModuleConfig,
}

impl<Provider, Pool, Network, Tasks, Events, EvmConfig>
EthHandlersBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig>
where
Provider: BlockReaderIdExt
+ AccountReader
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
EvmConfig: ConfigureEvm + 'static,
{
/// Creates a new `EthHandlersBuilder` with the provided components.
pub(crate) const fn new(
eth_handlers_config: EthHandlersConfig<Provider, Pool, Network, Tasks, Events, EvmConfig>,
rpc_config: RpcModuleConfig,
) -> Self {
Self { eth_handlers_config, rpc_config }
}

/// Builds and returns an `EthHandlers` instance.
pub(crate) fn build(self) -> EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
// Initialize the cache
let cache = self.init_cache();

// Initialize the fee history cache
let fee_history_cache = self.init_fee_history_cache(&cache);

// Spawn background tasks for cache
self.spawn_cache_tasks(&cache, &fee_history_cache);

// Initialize the gas oracle
let gas_oracle = self.init_gas_oracle(&cache);

// Initialize the blocking task pool
let blocking_task_pool = self.init_blocking_task_pool();

// Initialize the Eth API
let api = self.init_api(&cache, gas_oracle, &fee_history_cache, &blocking_task_pool);

// Initialize the filter
let filter = self.init_filter(&cache);

// Initialize the pubsub
let pubsub = self.init_pubsub();

EthHandlers { api, cache, filter, pubsub, blocking_task_pool }
}

/// Initializes the `EthStateCache`.
fn init_cache(&self) -> EthStateCache {
EthStateCache::spawn_with(
self.eth_handlers_config.provider.clone(),
self.rpc_config.eth.cache.clone(),
self.eth_handlers_config.executor.clone(),
self.eth_handlers_config.evm_config.clone(),
)
}

/// Initializes the `FeeHistoryCache`.
fn init_fee_history_cache(&self, cache: &EthStateCache) -> FeeHistoryCache {
FeeHistoryCache::new(cache.clone(), self.rpc_config.eth.fee_history_cache.clone())
}

/// Spawns background tasks for updating caches.
fn spawn_cache_tasks(&self, cache: &EthStateCache, fee_history_cache: &FeeHistoryCache) {
// Get the stream of new canonical blocks
let new_canonical_blocks = self.eth_handlers_config.events.canonical_state_stream();

// Clone the cache for the task
let cache_clone = cache.clone();

// Spawn a critical task to update the cache with new blocks
self.eth_handlers_config.executor.spawn_critical(
"cache canonical blocks task",
Box::pin(async move {
cache_new_blocks_task(cache_clone, new_canonical_blocks).await;
}),
);

// Get another stream of new canonical blocks
let new_canonical_blocks = self.eth_handlers_config.events.canonical_state_stream();

// Clone the fee history cache for the task
let fhc_clone = fee_history_cache.clone();

// Clone the provider for the task
let provider_clone = self.eth_handlers_config.provider.clone();

// Spawn a critical task to update the fee history cache with new blocks
self.eth_handlers_config.executor.spawn_critical(
"cache canonical blocks for fee history task",
Box::pin(async move {
fee_history_cache_new_blocks_task(fhc_clone, new_canonical_blocks, provider_clone)
.await;
}),
);
}

/// Initializes the `GasPriceOracle`.
fn init_gas_oracle(&self, cache: &EthStateCache) -> GasPriceOracle<Provider> {
GasPriceOracle::new(
self.eth_handlers_config.provider.clone(),
self.rpc_config.eth.gas_oracle.clone(),
cache.clone(),
)
}

/// Initializes the `BlockingTaskPool`.
fn init_blocking_task_pool(&self) -> BlockingTaskPool {
BlockingTaskPool::build().expect("failed to build tracing pool")
}

/// Initializes the `EthApi`.
fn init_api(
&self,
cache: &EthStateCache,
gas_oracle: GasPriceOracle<Provider>,
fee_history_cache: &FeeHistoryCache,
blocking_task_pool: &BlockingTaskPool,
) -> EthApi<Provider, Pool, Network, EvmConfig> {
EthApi::with_spawner(
self.eth_handlers_config.provider.clone(),
self.eth_handlers_config.pool.clone(),
self.eth_handlers_config.network.clone(),
cache.clone(),
gas_oracle,
self.rpc_config.eth.rpc_gas_cap,
Box::new(self.eth_handlers_config.executor.clone()),
blocking_task_pool.clone(),
fee_history_cache.clone(),
self.eth_handlers_config.evm_config.clone(),
self.eth_handlers_config.eth_raw_transaction_forwarder.clone(),
)
}

/// Initializes the `EthFilter`.
fn init_filter(&self, cache: &EthStateCache) -> EthFilter<Provider, Pool> {
EthFilter::new(
self.eth_handlers_config.provider.clone(),
self.eth_handlers_config.pool.clone(),
cache.clone(),
self.rpc_config.eth.filter_config(),
Box::new(self.eth_handlers_config.executor.clone()),
)
}

/// Initializes the `EthPubSub`.
fn init_pubsub(&self) -> EthPubSub<Provider, Pool, Events, Network> {
EthPubSub::with_spawner(
self.eth_handlers_config.provider.clone(),
self.eth_handlers_config.pool.clone(),
self.eth_handlers_config.events.clone(),
self.eth_handlers_config.network.clone(),
Box::new(self.eth_handlers_config.executor.clone()),
)
}
}

/// Additional config values for the eth namespace.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct EthConfig {
Expand Down
102 changes: 22 additions & 80 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use crate::{
auth::AuthRpcModule, cors::CorsDomainError, error::WsHttpSamePortError,
auth::AuthRpcModule,
cors::CorsDomainError,
error::WsHttpSamePortError,
eth::{EthHandlersBuilder, EthHandlersConfig},
metrics::RpcRequestMetrics,
};
use error::{ConflictingModules, RpcError, ServerKind};
Expand All @@ -175,22 +178,13 @@ use reth_provider::{
ChangeSetReader, EvmEnvProvider, StateProviderFactory,
};
use reth_rpc::{
eth::{
cache::{cache_new_blocks_task, EthStateCache},
fee_history_cache_new_blocks_task,
gas_oracle::GasPriceOracle,
traits::RawTransactionForwarder,
EthBundle, FeeHistoryCache,
},
AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider,
NetApi, OtterscanApi, RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
eth::{cache::EthStateCache, traits::RawTransactionForwarder, EthBundle},
AdminApi, DebugApi, EngineEthApi, EthApi, EthSubscriptionIdProvider, NetApi, OtterscanApi,
RPCApi, RethApi, TraceApi, TxPoolApi, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_layer::{AuthLayer, Claims, JwtAuthValidator, JwtSecret};
use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner, TokioTaskExecutor,
};
use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -1001,7 +995,7 @@ where
///
/// This will spawn the required service tasks for [`EthApi`] for:
/// - [`EthStateCache`]
/// - [`FeeHistoryCache`]
/// - [`reth_rpc::eth::FeeHistoryCache`]
fn with_eth<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&EthHandlers<Provider, Pool, Network, Events, EvmConfig>) -> R,
Expand All @@ -1013,71 +1007,19 @@ where
}

fn init_eth(&self) -> EthHandlers<Provider, Pool, Network, Events, EvmConfig> {
let cache = EthStateCache::spawn_with(
self.provider.clone(),
self.config.eth.cache.clone(),
self.executor.clone(),
self.evm_config.clone(),
);
let gas_oracle = GasPriceOracle::new(
self.provider.clone(),
self.config.eth.gas_oracle.clone(),
cache.clone(),
);
let new_canonical_blocks = self.events.canonical_state_stream();
let c = cache.clone();

self.executor.spawn_critical(
"cache canonical blocks task",
Box::pin(async move {
cache_new_blocks_task(c, new_canonical_blocks).await;
}),
);

let fee_history_cache =
FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone());
let new_canonical_blocks = self.events.canonical_state_stream();
let fhc = fee_history_cache.clone();
let provider_clone = self.provider.clone();
self.executor.spawn_critical(
"cache canonical blocks for fee history task",
Box::pin(async move {
fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone).await;
}),
);

let executor = Box::new(self.executor.clone());
let blocking_task_pool = BlockingTaskPool::build().expect("failed to build tracing pool");
let api = EthApi::with_spawner(
self.provider.clone(),
self.pool.clone(),
self.network.clone(),
cache.clone(),
gas_oracle,
self.config.eth.rpc_gas_cap,
executor.clone(),
blocking_task_pool.clone(),
fee_history_cache,
self.evm_config.clone(),
self.eth_raw_transaction_forwarder.clone(),
);
let filter = EthFilter::new(
self.provider.clone(),
self.pool.clone(),
cache.clone(),
self.config.eth.filter_config(),
executor.clone(),
);

let pubsub = EthPubSub::with_spawner(
self.provider.clone(),
self.pool.clone(),
self.events.clone(),
self.network.clone(),
executor,
);

EthHandlers { api, cache, filter, pubsub, blocking_task_pool }
EthHandlersBuilder::new(
EthHandlersConfig {
provider: self.provider.clone(),
pool: self.pool.clone(),
network: self.network.clone(),
executor: self.executor.clone(),
events: self.events.clone(),
evm_config: self.evm_config.clone(),
eth_raw_transaction_forwarder: self.eth_raw_transaction_forwarder.clone(),
},
self.config.clone(),
)
.build()
}

/// Returns the configured [`EthHandlers`] or creates it if it does not exist yet
Expand Down

0 comments on commit 8f2522e

Please sign in to comment.