From f8413844f9c3fc8156d123e1bba1e88ccfb182b5 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Sun, 7 Jul 2024 20:12:27 -0400 Subject: [PATCH 01/12] remove intermediary types, update effected tests, add ipc start --- Cargo.toml | 2 +- crates/node/builder/src/rpc.rs | 29 ++- crates/rpc/rpc-builder/src/lib.rs | 272 ++++----------------- crates/rpc/rpc-builder/tests/it/startup.rs | 56 ++--- crates/rpc/rpc-builder/tests/it/utils.rs | 40 ++- examples/rpc-db/src/main.rs | 4 +- 6 files changed, 106 insertions(+), 297 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 84b4bfb81afb..1a0a112e23ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -535,4 +535,4 @@ proptest-derive = "0.5" serial_test = "3" similar-asserts = "1.5.0" test-fuzz = "5" -iai-callgrind = "0.11" +iai-callgrind = "0.11.1" diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 7f6cb5e898cc..84761f24ac4f 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -288,19 +288,19 @@ where extend_rpc_modules.extend_rpc_modules(ctx)?; - let server_config = config.rpc.rpc_server_config(); - let launch_rpc = modules.clone().start_server(server_config).map_ok(|handle| { - if let Some(path) = handle.ipc_endpoint() { - info!(target: "reth::cli", %path, "RPC IPC server started"); - } - if let Some(addr) = handle.http_local_addr() { - info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); - } - if let Some(addr) = handle.ws_local_addr() { - info!(target: "reth::cli", url=%addr, "RPC WS server started"); - } - handle - }); + let mut server_config = config.rpc.rpc_server_config(); + let cloned_modules = modules.clone(); + let launch_rpc = server_config.start(&cloned_modules).await?; + + if let Some(path) = launch_rpc.ipc_endpoint() { + info!(target: "reth::cli", %path, "RPC IPC server started"); + } + if let Some(addr) = launch_rpc.http_local_addr() { + info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); + } + if let Some(addr) = launch_rpc.ws_local_addr() { + info!(target: "reth::cli", url=%addr, "RPC WS server started"); + } let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| { let addr = handle.local_addr(); @@ -313,8 +313,7 @@ where }); // launch servers concurrently - let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?; - let handles = RethRpcServerHandles { rpc, auth }; + let handles = RethRpcServerHandles { rpc: launch_rpc, auth: launch_auth.await? }; let ctx = RpcContext { node, diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index ee57e79e9aa7..458423a24633 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -8,9 +8,8 @@ //! transaction pool. [`RpcModuleBuilder::build`] returns a [`TransportRpcModules`] which contains //! the transport specific config (what APIs are available via this transport). //! -//! The [`RpcServerConfig`] is used to configure the [`RpcServer`] type which contains all transport -//! implementations (http server, ws server, ipc server). [`RpcServer::start`] requires the -//! [`TransportRpcModules`] so it can start the servers with the configured modules. +//! The [`RpcServerConfig`] is used to assemble and start the http server, ws server, ipc servers, +//! it requires the [`TransportRpcModules`] so it can start the servers with the configured modules. //! //! # Examples //! @@ -56,11 +55,8 @@ //! evm_config, //! ) //! .build(transports, EthApiBuild::build); -//! let handle = RpcServerConfig::default() -//! .with_http(ServerBuilder::default()) -//! .start(transport_modules) -//! .await -//! .unwrap(); +//! let mut config = RpcServerConfig::default().with_http(ServerBuilder::default()); +//! config.start(&transport_modules).await; //! } //! ``` //! @@ -119,11 +115,10 @@ //! //! // start the servers //! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build(); -//! let config = RpcServerConfig::default(); +//! let mut config = RpcServerConfig::default(); //! //! let (_rpc_handle, _auth_handle) = -//! try_join!(modules.start_server(config), auth_module.start_server(auth_config),) -//! .unwrap(); +//! try_join!(config.start(&modules), auth_module.start_server(auth_config),).unwrap(); //! } //! ``` @@ -137,7 +132,6 @@ use std::{ collections::HashMap, - fmt, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -147,12 +141,11 @@ use error::{ConflictingModules, RpcError, ServerKind}; use http::{header::AUTHORIZATION, HeaderMap}; use jsonrpsee::{ core::RegisterMethodError, - server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle}, + server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, ServerHandle}, Methods, RpcModule, }; use reth_engine_primitives::EngineTypes; use reth_evm::ConfigureEvm; -use reth_ipc::server::IpcServer; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; use reth_provider::{ AccountReader, BlockReader, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, @@ -175,7 +168,6 @@ use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool}; use serde::{Deserialize, Serialize}; use tower_http::cors::CorsLayer; -use tracing::{instrument, trace}; use crate::{ auth::AuthRpcModule, cors::CorsDomainError, error::WsHttpSamePortError, @@ -237,10 +229,12 @@ where EthApi: FullEthApiServer, { let module_config = module_config.into(); - let server_config = server_config.into(); - RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config) - .build(module_config, eth) - .start_server(server_config) + server_config + .into() + .start( + &RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config) + .build(module_config, eth), + ) .await } @@ -501,8 +495,6 @@ where /// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can /// be used to start the transport server(s). - /// - /// See also [`RpcServer::start`] pub fn build( self, module_config: TransportRpcModuleConfig, @@ -1283,11 +1275,6 @@ impl RpcServerConfig { self.ipc_endpoint.clone() } - /// Convenience function to do [`RpcServerConfig::build`] and [`RpcServer::start`] in one step - pub async fn start(self, modules: TransportRpcModules) -> Result { - self.build(&modules).await?.start(modules).await - } - /// Creates the [`CorsLayer`] if any fn maybe_cors_layer(cors: Option) -> Result, CorsDomainError> { cors.as_deref().map(cors::create_cors_layer).transpose() @@ -1298,13 +1285,17 @@ impl RpcServerConfig { self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) } - /// Builds the ws and http server(s). + /// Builds and starts the ws and http server(s). /// /// If both are on the same port, they are combined into one server. - async fn build_ws_http( + pub async fn start( &mut self, modules: &TransportRpcModules, - ) -> Result { + ) -> Result { + let mut http_handle = None; + let mut ws_handle = None; + let mut ipc_handle = None; + let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::LOCALHOST, constants::DEFAULT_HTTP_RPC_PORT, @@ -1315,6 +1306,17 @@ impl RpcServerConfig { constants::DEFAULT_WS_RPC_PORT, ))); + let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); + let ipc_path = + self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); + + if let Some(builder) = self.ipc_server_config.take() { + let ipc = builder + .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) + .build(ipc_path); + ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); + } + // If both are configured on the same port, we combine them into one server. if self.http_addr == self.ws_addr && self.http_server_config.is_some() && @@ -1327,7 +1329,7 @@ impl RpcServerConfig { http_cors_domains: Some(http_cors.clone()), ws_cors_domains: Some(ws_cors.clone()), } - .into()) + .into()); } Some(ws_cors) } @@ -1363,12 +1365,20 @@ impl RpcServerConfig { let addr = server .local_addr() .map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?; - return Ok(WsHttpServer { + if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) { + let handle = server.start(module.clone()); + http_handle = Some(handle.clone()); + ws_handle = Some(handle); + } + return Ok(RpcServerHandle { http_local_addr: Some(addr), ws_local_addr: Some(addr), - server: WsHttpServers::SamePort(server), + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, jwt_secret: self.jwt_secret, - }) + }); } let mut http_local_addr = None; @@ -1421,37 +1431,20 @@ impl RpcServerConfig { http_local_addr = Some(local_addr); http_server = Some(server); } - - Ok(WsHttpServer { + http_handle = http_server + .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); + ws_handle = ws_server + .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error"))); + Ok(RpcServerHandle { http_local_addr, ws_local_addr, - server: WsHttpServers::DifferentPort { http: http_server, ws: ws_server }, + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, jwt_secret: self.jwt_secret, }) } - - /// Finalize the configuration of the server(s). - /// - /// This consumes the builder and returns a server. - /// - /// Note: The server is not started and does nothing unless polled, See also - /// [`RpcServer::start`] - pub async fn build(mut self, modules: &TransportRpcModules) -> Result { - let mut server = RpcServer::empty(); - server.ws_http = self.build_ws_http(modules).await?; - - if let Some(builder) = self.ipc_server_config { - let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); - let ipc_path = - self.ipc_endpoint.unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); - let ipc = builder - .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) - .build(ipc_path); - server.ipc = Some(ipc); - } - - Ok(server) - } } /// Holds modules to be installed per transport type @@ -1658,167 +1651,6 @@ impl TransportRpcModules { self.merge_ipc(other)?; Ok(()) } - - /// Convenience function for starting a server - pub async fn start_server(self, builder: RpcServerConfig) -> Result { - builder.start(self).await - } -} - -/// Container type for ws and http servers in all possible combinations. -#[derive(Default)] -struct WsHttpServer { - /// The address of the http server - http_local_addr: Option, - /// The address of the ws server - ws_local_addr: Option, - /// Configured ws,http servers - server: WsHttpServers, - /// The jwt secret. - jwt_secret: Option, -} - -// Define the type alias with detailed type complexity -type WsHttpServerKind = Server< - Stack< - tower::util::Either, Identity>, - Stack, Identity>, - >, - Stack, ->; - -/// Enum for holding the http and ws servers in all possible combinations. -enum WsHttpServers { - /// Both servers are on the same port - SamePort(WsHttpServerKind), - /// Servers are on different ports - DifferentPort { http: Option, ws: Option }, -} - -// === impl WsHttpServers === - -impl WsHttpServers { - /// Starts the servers and returns the handles (http, ws) - fn start( - self, - http_module: Option>, - ws_module: Option>, - config: &TransportRpcModuleConfig, - ) -> Result<(Option, Option), RpcError> { - let mut http_handle = None; - let mut ws_handle = None; - match self { - Self::SamePort(server) => { - // Make sure http and ws modules are identical, since we currently can't run - // different modules on same server - config.ensure_ws_http_identical()?; - - if let Some(module) = http_module.or(ws_module) { - let handle = server.start(module); - http_handle = Some(handle.clone()); - ws_handle = Some(handle); - } - } - Self::DifferentPort { http, ws } => { - if let Some((server, module)) = - http.and_then(|server| http_module.map(|module| (server, module))) - { - http_handle = Some(server.start(module)); - } - if let Some((server, module)) = - ws.and_then(|server| ws_module.map(|module| (server, module))) - { - ws_handle = Some(server.start(module)); - } - } - } - - Ok((http_handle, ws_handle)) - } -} - -impl Default for WsHttpServers { - fn default() -> Self { - Self::DifferentPort { http: None, ws: None } - } -} - -/// Container type for each transport ie. http, ws, and ipc server -pub struct RpcServer { - /// Configured ws,http servers - ws_http: WsHttpServer, - /// ipc server - ipc: Option>>, -} - -// === impl RpcServer === - -impl RpcServer { - fn empty() -> Self { - Self { ws_http: Default::default(), ipc: None } - } - - /// Returns the [`SocketAddr`] of the http server if started. - pub const fn http_local_addr(&self) -> Option { - self.ws_http.http_local_addr - } - /// Return the `JwtSecret` of the server - pub const fn jwt(&self) -> Option { - self.ws_http.jwt_secret - } - - /// Returns the [`SocketAddr`] of the ws server if started. - pub const fn ws_local_addr(&self) -> Option { - self.ws_http.ws_local_addr - } - - /// Returns the endpoint of the ipc server if started. - pub fn ipc_endpoint(&self) -> Option { - self.ipc.as_ref().map(|ipc| ipc.endpoint()) - } - - /// Starts the configured server by spawning the servers on the tokio runtime. - /// - /// This returns an [RpcServerHandle] that's connected to the server task(s) until the server is - /// stopped or the [RpcServerHandle] is dropped. - #[instrument(name = "start", skip_all, fields(http = ?self.http_local_addr(), ws = ?self.ws_local_addr(), ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")] - pub async fn start(self, modules: TransportRpcModules) -> Result { - trace!(target: "rpc", "staring RPC server"); - let Self { ws_http, ipc: ipc_server } = self; - let TransportRpcModules { config, http, ws, ipc } = modules; - let mut handle = RpcServerHandle { - http_local_addr: ws_http.http_local_addr, - ws_local_addr: ws_http.ws_local_addr, - http: None, - ws: None, - ipc_endpoint: None, - ipc: None, - jwt_secret: None, - }; - - let (http, ws) = ws_http.server.start(http, ws, &config)?; - handle.http = http; - handle.ws = ws; - - if let Some((server, module)) = - ipc_server.and_then(|server| ipc.map(|module| (server, module))) - { - handle.ipc_endpoint = Some(server.endpoint()); - handle.ipc = Some(server.start(module).await?); - } - - Ok(handle) - } -} - -impl fmt::Debug for RpcServer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RpcServer") - .field("http", &self.ws_http.http_local_addr.is_some()) - .field("ws", &self.ws_http.ws_local_addr.is_some()) - .field("ipc", &self.ipc.is_some()) - .finish() - } } /// A handle to the spawned servers. diff --git a/crates/rpc/rpc-builder/tests/it/startup.rs b/crates/rpc/rpc-builder/tests/it/startup.rs index 4c873f2b38c9..4f5a37aa01fb 100644 --- a/crates/rpc/rpc-builder/tests/it/startup.rs +++ b/crates/rpc/rpc-builder/tests/it/startup.rs @@ -28,9 +28,8 @@ async fn test_http_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), EthApiBuild::build); - let result = server - .start_server(RpcServerConfig::http(Default::default()).with_http_address(addr)) - .await; + let mut config = RpcServerConfig::http(Default::default()).with_http_address(addr); + let result = config.start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}"); } @@ -42,8 +41,8 @@ async fn test_ws_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), EthApiBuild::build); - let result = - server.start_server(RpcServerConfig::ws(Default::default()).with_ws_address(addr)).await; + let mut config = RpcServerConfig::ws(Default::default()).with_ws_address(addr); + let result = config.start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}"); } @@ -65,14 +64,11 @@ async fn test_launch_same_port_different_modules() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_http_address(addr), - ) - .await; + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_http_address(addr); + let res = config.start(&server).await; let err = res.unwrap_err(); assert!(matches!( err, @@ -89,16 +85,13 @@ async fn test_launch_same_port_same_cors() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_cors(Some("*".to_string())) - .with_http_cors(Some("*".to_string())) - .with_http_address(addr), - ) - .await; + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_cors(Some("*".to_string())) + .with_http_cors(Some("*".to_string())) + .with_http_address(addr); + let res = config.start(&server).await; assert!(res.is_ok()); } @@ -111,16 +104,13 @@ async fn test_launch_same_port_different_cors() { EthApiBuild::build, ); let addr = test_address(); - let res = server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_cors(Some("*".to_string())) - .with_http_cors(Some("example".to_string())) - .with_http_address(addr), - ) - .await; + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_cors(Some("*".to_string())) + .with_http_cors(Some("example".to_string())) + .with_http_address(addr); + let res = config.start(&server).await; let err = res.unwrap_err(); assert!(matches!( err, diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index 85c9dbeac3f2..7a1644c8f136 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -53,20 +53,16 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { pub async fn launch_http(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_http(modules), EthApiBuild::build); - server - .start_server(RpcServerConfig::http(Default::default()).with_http_address(test_address())) - .await - .unwrap() + let mut config = RpcServerConfig::http(Default::default()).with_http_address(test_address()); + config.start(&server).await.unwrap() } /// Launches a new server with ws only with the given modules pub async fn launch_ws(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_ws(modules), EthApiBuild::build); - server - .start_server(RpcServerConfig::ws(Default::default()).with_ws_address(test_address())) - .await - .unwrap() + let mut config = RpcServerConfig::ws(Default::default()).with_http_address(test_address()); + config.start(&server).await.unwrap() } /// Launches a new server with http and ws and with the given modules @@ -77,15 +73,11 @@ pub async fn launch_http_ws(modules: impl Into) -> RpcServer TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules), EthApiBuild::build, ); - server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(test_address()) - .with_http(Default::default()) - .with_http_address(test_address()), - ) - .await - .unwrap() + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(test_address()) + .with_http(Default::default()) + .with_http_address(test_address()); + config.start(&server).await.unwrap() } /// Launches a new server with http and ws and with the given modules on the same port. @@ -97,15 +89,11 @@ pub async fn launch_http_ws_same_port(modules: impl Into) -> EthApiBuild::build, ); let addr = test_address(); - server - .start_server( - RpcServerConfig::ws(Default::default()) - .with_ws_address(addr) - .with_http(Default::default()) - .with_http_address(addr), - ) - .await - .unwrap() + let mut config = RpcServerConfig::ws(Default::default()) + .with_ws_address(addr) + .with_http(Default::default()) + .with_http_address(addr); + config.start(&server).await.unwrap() } /// Returns an [`RpcModuleBuilder`] with testing components. diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 85ad28d6a051..809129c81c2c 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -77,9 +77,9 @@ async fn main() -> eyre::Result<()> { server.merge_configured(custom_rpc.into_rpc())?; // Start the server & keep it alive - let server_args = + let mut server_args = RpcServerConfig::http(Default::default()).with_http_address("0.0.0.0:8545".parse()?); - let _handle = server_args.start(server).await?; + let _handle = server_args.start(&server).await?; futures::future::pending::<()>().await; Ok(()) From b5b129bd15e0b071ed7658f1c1b14eb0de88b5f4 Mon Sep 17 00:00:00 2001 From: Sean Matt Date: Mon, 8 Jul 2024 11:49:35 -0400 Subject: [PATCH 02/12] Update crates/rpc/rpc-builder/src/lib.rs Co-authored-by: Matthias Seitz --- crates/rpc/rpc-builder/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 458423a24633..240954fba6c8 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1289,7 +1289,7 @@ impl RpcServerConfig { /// /// If both are on the same port, they are combined into one server. pub async fn start( - &mut self, + self, modules: &TransportRpcModules, ) -> Result { let mut http_handle = None; From 811917f3ed0cd278503710d84023ad6039d59241 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:01:32 -0400 Subject: [PATCH 03/12] fix according to review feedback --- Cargo.toml | 2 +- crates/node/builder/src/rpc.rs | 42 +++-- crates/rpc/rpc-builder/src/lib.rs | 189 +++++++++++++-------- crates/rpc/rpc-builder/tests/it/startup.rs | 10 +- crates/rpc/rpc-builder/tests/it/utils.rs | 30 ++-- examples/rpc-db/src/main.rs | 2 +- 6 files changed, 177 insertions(+), 98 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1a0a112e23ab..84b4bfb81afb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -535,4 +535,4 @@ proptest-derive = "0.5" serial_test = "3" similar-asserts = "1.5.0" test-fuzz = "5" -iai-callgrind = "0.11.1" +iai-callgrind = "0.11" diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 84761f24ac4f..8773d0a34169 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -288,19 +288,33 @@ where extend_rpc_modules.extend_rpc_modules(ctx)?; - let mut server_config = config.rpc.rpc_server_config(); + let server_config = config.rpc.rpc_server_config(); + // let cloned_modules = modules.clone(); + // let launch_rpc = server_config.start(&cloned_modules).await?; + + // if let Some(path) = launch_rpc.ipc_endpoint() { + // info!(target: "reth::cli", %path, "RPC IPC server started"); + // } + // if let Some(addr) = launch_rpc.http_local_addr() { + // info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); + // } + // if let Some(addr) = launch_rpc.ws_local_addr() { + // info!(target: "reth::cli", url=%addr, "RPC WS server started"); + // } + //let launch_rpc = modules.clone().start_server(server_config).map_ok(|handle| { let cloned_modules = modules.clone(); - let launch_rpc = server_config.start(&cloned_modules).await?; - - if let Some(path) = launch_rpc.ipc_endpoint() { - info!(target: "reth::cli", %path, "RPC IPC server started"); - } - if let Some(addr) = launch_rpc.http_local_addr() { - info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); - } - if let Some(addr) = launch_rpc.ws_local_addr() { - info!(target: "reth::cli", url=%addr, "RPC WS server started"); - } + let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| { + if let Some(path) = handle.ipc_endpoint() { + info!(target: "reth::cli", %path, "RPC IPC server started"); + } + if let Some(addr) = handle.http_local_addr() { + info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); + } + if let Some(addr) = handle.ws_local_addr() { + info!(target: "reth::cli", url=%addr, "RPC WS server started"); + } + handle + }); let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| { let addr = handle.local_addr(); @@ -313,7 +327,9 @@ where }); // launch servers concurrently - let handles = RethRpcServerHandles { rpc: launch_rpc, auth: launch_auth.await? }; + //let handles = RethRpcServerHandles { rpc: launch_rpc, auth: launch_auth.await? }; + let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?; + let handles = RethRpcServerHandles { rpc, auth }; let ctx = RpcContext { node, diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 240954fba6c8..c5c3efd4d6b2 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1281,20 +1281,16 @@ impl RpcServerConfig { } /// Creates the [`AuthLayer`] if any - fn maybe_jwt_layer(&self) -> Option> { - self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) - } + // fn maybe_jwt_layer(&self) -> Option> { + // self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) + // } /// Builds and starts the ws and http server(s). /// /// If both are on the same port, they are combined into one server. - pub async fn start( - self, - modules: &TransportRpcModules, - ) -> Result { + pub async fn start(self, modules: &TransportRpcModules) -> Result { let mut http_handle = None; let mut ws_handle = None; - let mut ipc_handle = None; let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::LOCALHOST, @@ -1310,12 +1306,16 @@ impl RpcServerConfig { let ipc_path = self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); - if let Some(builder) = self.ipc_server_config.take() { - let ipc = builder - .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) - .build(ipc_path); - ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); - } + // if let Some(builder) = self.ipc_server_config.take() { + // let ipc = builder + // .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) + // .build(ipc_path); + // ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); + // } + let builder = self.ipc_server_config.expect("Expected a value, but found None"); + let ipc = + builder.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)).build(ipc_path); + let ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); // If both are configured on the same port, we combine them into one server. if self.http_addr == self.ws_addr && @@ -1338,16 +1338,21 @@ impl RpcServerConfig { .cloned(); // we merge this into one server using the http setup - self.ws_server_config.take(); + //self.ws_server_config.take(); modules.config.ensure_ws_http_identical()?; - let builder = self.http_server_config.take().expect("http_server_config is Some"); + //let builder = self.http_server_config.take().expect("http_server_config is Some"); + let builder = self.http_server_config.expect("Expected a value, but found None"); + let server = builder .set_http_middleware( tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(cors)?) - .option_layer(self.maybe_jwt_layer()), + .option_layer( + self.jwt_secret + .map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + ), ) .set_rpc_middleware( RpcServiceBuilder::new().layer( @@ -1381,59 +1386,107 @@ impl RpcServerConfig { }); } - let mut http_local_addr = None; - let mut http_server = None; - - let mut ws_local_addr = None; - let mut ws_server = None; - if let Some(builder) = self.ws_server_config.take() { - let server = builder - .ws_only() - .set_http_middleware( - tower::ServiceBuilder::new() - .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) - .option_layer(self.maybe_jwt_layer()), - ) - .set_rpc_middleware( - RpcServiceBuilder::new() - .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), - ) - .build(ws_socket_addr) - .await - .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - let addr = server - .local_addr() - .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - - ws_local_addr = Some(addr); - ws_server = Some(server); - } - - if let Some(builder) = self.http_server_config.take() { - let server = builder - .http_only() - .set_http_middleware( - tower::ServiceBuilder::new() - .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) - .option_layer(self.maybe_jwt_layer()), - ) - .set_rpc_middleware( - RpcServiceBuilder::new().layer( - modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(), + //let mut http_local_addr = None; + //let mut http_server = None; + + //let mut ws_local_addr = None; + //let mut ws_server = None; + + // if let Some(builder) = self.ws_server_config.take() { + // let server = builder + // .ws_only() + // .set_http_middleware( + // tower::ServiceBuilder::new() + // .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) + // .option_layer(self.jwt_secret.map(|secret| + // AuthLayer::new(JwtAuthValidator::new(secret)))), ) + // .set_rpc_middleware( + // RpcServiceBuilder::new() + // + // .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), + // ) + // .build(ws_socket_addr) + // .await + // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + // let addr = server + // .local_addr() + // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + + // ws_local_addr = Some(addr); + // ws_server = Some(server); + // } + let builder = self.ws_server_config.expect("Expected a value, but found None"); + let server = builder + .ws_only() + .set_http_middleware( + tower::ServiceBuilder::new() + .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) + .option_layer( + self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), ), - ) - .build(http_socket_addr) - .await - .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; - let local_addr = server - .local_addr() - .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; - http_local_addr = Some(local_addr); - http_server = Some(server); - } - http_handle = http_server + ) + .set_rpc_middleware( + RpcServiceBuilder::new() + .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), + ) + .build(ws_socket_addr) + .await + .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + let addr = server + .local_addr() + .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + + let ws_local_addr = Some(addr); + let ws_server = Some(server); + + // if let Some(builder) = self.http_server_config.take() { + // let server = builder + // .http_only() + // .set_http_middleware( + // tower::ServiceBuilder::new() + // .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) + // .option_layer(self.jwt_secret.map(|secret| + // AuthLayer::new(JwtAuthValidator::new(secret)))), ) + // .set_rpc_middleware( + // RpcServiceBuilder::new().layer( + // modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(), + // ), + // ) + // .build(http_socket_addr) + // .await + // .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; + // let local_addr = server + // .local_addr() + // .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; + // http_local_addr = Some(local_addr); + // http_server = Some(server); + // } + let builder = self.http_server_config.expect("Expected a value, but found None"); + let server = builder + .http_only() + .set_http_middleware( + tower::ServiceBuilder::new() + .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) + .option_layer( + self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + ), + ) + .set_rpc_middleware( + RpcServiceBuilder::new() + .layer(modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default()), + ) + .build(http_socket_addr) + .await + .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; + let local_addr = server + .local_addr() + .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; + let http_local_addr = Some(local_addr); + let http_server = Some(server); + + let http_handle = http_server .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); - ws_handle = ws_server + let ws_handle = ws_server .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error"))); Ok(RpcServerHandle { http_local_addr, diff --git a/crates/rpc/rpc-builder/tests/it/startup.rs b/crates/rpc/rpc-builder/tests/it/startup.rs index 4f5a37aa01fb..ca9dbdd383e4 100644 --- a/crates/rpc/rpc-builder/tests/it/startup.rs +++ b/crates/rpc/rpc-builder/tests/it/startup.rs @@ -28,7 +28,7 @@ async fn test_http_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), EthApiBuild::build); - let mut config = RpcServerConfig::http(Default::default()).with_http_address(addr); + let config = RpcServerConfig::http(Default::default()).with_http_address(addr); let result = config.start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}"); @@ -41,7 +41,7 @@ async fn test_ws_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), EthApiBuild::build); - let mut config = RpcServerConfig::ws(Default::default()).with_ws_address(addr); + let config = RpcServerConfig::ws(Default::default()).with_ws_address(addr); let result = config.start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}"); @@ -64,7 +64,7 @@ async fn test_launch_same_port_different_modules() { EthApiBuild::build, ); let addr = test_address(); - let mut config = RpcServerConfig::ws(Default::default()) + let config = RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) .with_http_address(addr); @@ -85,7 +85,7 @@ async fn test_launch_same_port_same_cors() { EthApiBuild::build, ); let addr = test_address(); - let mut config = RpcServerConfig::ws(Default::default()) + let config = RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) .with_cors(Some("*".to_string())) @@ -104,7 +104,7 @@ async fn test_launch_same_port_different_cors() { EthApiBuild::build, ); let addr = test_address(); - let mut config = RpcServerConfig::ws(Default::default()) + let config = RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) .with_cors(Some("*".to_string())) diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index 7a1644c8f136..ea9954f23c10 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -53,16 +53,22 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle { pub async fn launch_http(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_http(modules), EthApiBuild::build); - let mut config = RpcServerConfig::http(Default::default()).with_http_address(test_address()); - config.start(&server).await.unwrap() + RpcServerConfig::http(Default::default()) + .with_http_address(test_address()) + .start(&server) + .await + .unwrap() } /// Launches a new server with ws only with the given modules pub async fn launch_ws(modules: impl Into) -> RpcServerHandle { let builder = test_rpc_builder(); let server = builder.build(TransportRpcModuleConfig::set_ws(modules), EthApiBuild::build); - let mut config = RpcServerConfig::ws(Default::default()).with_http_address(test_address()); - config.start(&server).await.unwrap() + RpcServerConfig::ws(Default::default()) + .with_http_address(test_address()) + .start(&server) + .await + .unwrap() } /// Launches a new server with http and ws and with the given modules @@ -73,11 +79,13 @@ pub async fn launch_http_ws(modules: impl Into) -> RpcServer TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules), EthApiBuild::build, ); - let mut config = RpcServerConfig::ws(Default::default()) + RpcServerConfig::ws(Default::default()) .with_ws_address(test_address()) .with_http(Default::default()) - .with_http_address(test_address()); - config.start(&server).await.unwrap() + .with_http_address(test_address()) + .start(&server) + .await + .unwrap() } /// Launches a new server with http and ws and with the given modules on the same port. @@ -89,11 +97,13 @@ pub async fn launch_http_ws_same_port(modules: impl Into) -> EthApiBuild::build, ); let addr = test_address(); - let mut config = RpcServerConfig::ws(Default::default()) + RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) - .with_http_address(addr); - config.start(&server).await.unwrap() + .with_http_address(addr) + .start(&server) + .await + .unwrap() } /// Returns an [`RpcModuleBuilder`] with testing components. diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 809129c81c2c..30c0479549fc 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -77,7 +77,7 @@ async fn main() -> eyre::Result<()> { server.merge_configured(custom_rpc.into_rpc())?; // Start the server & keep it alive - let mut server_args = + let server_args = RpcServerConfig::http(Default::default()).with_http_address("0.0.0.0:8545".parse()?); let _handle = server_args.start(&server).await?; futures::future::pending::<()>().await; From dfe8ca78920c2d74ca23b965dcf0acd83fbd331e Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:12:16 -0400 Subject: [PATCH 04/12] update --- crates/node/builder/src/rpc.rs | 14 ----------- crates/rpc/rpc-builder/src/lib.rs | 2 +- crates/rpc/rpc-builder/tests/it/startup.rs | 28 ++++++++++++---------- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 8773d0a34169..03ae899cba8b 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -289,19 +289,6 @@ where extend_rpc_modules.extend_rpc_modules(ctx)?; let server_config = config.rpc.rpc_server_config(); - // let cloned_modules = modules.clone(); - // let launch_rpc = server_config.start(&cloned_modules).await?; - - // if let Some(path) = launch_rpc.ipc_endpoint() { - // info!(target: "reth::cli", %path, "RPC IPC server started"); - // } - // if let Some(addr) = launch_rpc.http_local_addr() { - // info!(target: "reth::cli", url=%addr, "RPC HTTP server started"); - // } - // if let Some(addr) = launch_rpc.ws_local_addr() { - // info!(target: "reth::cli", url=%addr, "RPC WS server started"); - // } - //let launch_rpc = modules.clone().start_server(server_config).map_ok(|handle| { let cloned_modules = modules.clone(); let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| { if let Some(path) = handle.ipc_endpoint() { @@ -327,7 +314,6 @@ where }); // launch servers concurrently - //let handles = RethRpcServerHandles { rpc: launch_rpc, auth: launch_auth.await? }; let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?; let handles = RethRpcServerHandles { rpc, auth }; diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index c5c3efd4d6b2..05579dbc498a 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -115,7 +115,7 @@ //! //! // start the servers //! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build(); -//! let mut config = RpcServerConfig::default(); +//! let config = RpcServerConfig::default(); //! //! let (_rpc_handle, _auth_handle) = //! try_join!(config.start(&modules), auth_module.start_server(auth_config),).unwrap(); diff --git a/crates/rpc/rpc-builder/tests/it/startup.rs b/crates/rpc/rpc-builder/tests/it/startup.rs index ca9dbdd383e4..5680d03a5307 100644 --- a/crates/rpc/rpc-builder/tests/it/startup.rs +++ b/crates/rpc/rpc-builder/tests/it/startup.rs @@ -28,8 +28,8 @@ async fn test_http_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), EthApiBuild::build); - let config = RpcServerConfig::http(Default::default()).with_http_address(addr); - let result = config.start(&server).await; + let result = + RpcServerConfig::http(Default::default()).with_http_address(addr).start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}"); } @@ -41,8 +41,7 @@ async fn test_ws_addr_in_use() { let builder = test_rpc_builder(); let server = builder .build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), EthApiBuild::build); - let config = RpcServerConfig::ws(Default::default()).with_ws_address(addr); - let result = config.start(&server).await; + let result = RpcServerConfig::ws(Default::default()).with_ws_address(addr).start(&server).await; let err = result.unwrap_err(); assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}"); } @@ -64,11 +63,12 @@ async fn test_launch_same_port_different_modules() { EthApiBuild::build, ); let addr = test_address(); - let config = RpcServerConfig::ws(Default::default()) + let res = RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) - .with_http_address(addr); - let res = config.start(&server).await; + .with_http_address(addr) + .start(&server) + .await; let err = res.unwrap_err(); assert!(matches!( err, @@ -85,13 +85,14 @@ async fn test_launch_same_port_same_cors() { EthApiBuild::build, ); let addr = test_address(); - let config = RpcServerConfig::ws(Default::default()) + let res = RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) .with_cors(Some("*".to_string())) .with_http_cors(Some("*".to_string())) - .with_http_address(addr); - let res = config.start(&server).await; + .with_http_address(addr) + .start(&server) + .await; assert!(res.is_ok()); } @@ -104,13 +105,14 @@ async fn test_launch_same_port_different_cors() { EthApiBuild::build, ); let addr = test_address(); - let config = RpcServerConfig::ws(Default::default()) + let res = RpcServerConfig::ws(Default::default()) .with_ws_address(addr) .with_http(Default::default()) .with_cors(Some("*".to_string())) .with_http_cors(Some("example".to_string())) - .with_http_address(addr); - let res = config.start(&server).await; + .with_http_address(addr) + .start(&server) + .await; let err = res.unwrap_err(); assert!(matches!( err, From f90f1a2d12effe5b1b4608ecf029f04fcc621d88 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:15:39 -0400 Subject: [PATCH 05/12] update --- crates/rpc/rpc-builder/src/lib.rs | 66 ++----------------------------- 1 file changed, 4 insertions(+), 62 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 05579dbc498a..2cebe5aa73e3 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -55,8 +55,10 @@ //! evm_config, //! ) //! .build(transports, EthApiBuild::build); -//! let mut config = RpcServerConfig::default().with_http(ServerBuilder::default()); -//! config.start(&transport_modules).await; +//! RpcServerConfig::default() +//! .with_http(ServerBuilder::default()) +//! .start(&transport_modules) +//! .await; //! } //! ``` //! @@ -1306,12 +1308,6 @@ impl RpcServerConfig { let ipc_path = self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); - // if let Some(builder) = self.ipc_server_config.take() { - // let ipc = builder - // .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) - // .build(ipc_path); - // ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); - // } let builder = self.ipc_server_config.expect("Expected a value, but found None"); let ipc = builder.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)).build(ipc_path); @@ -1338,11 +1334,8 @@ impl RpcServerConfig { .cloned(); // we merge this into one server using the http setup - //self.ws_server_config.take(); - modules.config.ensure_ws_http_identical()?; - //let builder = self.http_server_config.take().expect("http_server_config is Some"); let builder = self.http_server_config.expect("Expected a value, but found None"); let server = builder @@ -1386,35 +1379,6 @@ impl RpcServerConfig { }); } - //let mut http_local_addr = None; - //let mut http_server = None; - - //let mut ws_local_addr = None; - //let mut ws_server = None; - - // if let Some(builder) = self.ws_server_config.take() { - // let server = builder - // .ws_only() - // .set_http_middleware( - // tower::ServiceBuilder::new() - // .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) - // .option_layer(self.jwt_secret.map(|secret| - // AuthLayer::new(JwtAuthValidator::new(secret)))), ) - // .set_rpc_middleware( - // RpcServiceBuilder::new() - // - // .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), - // ) - // .build(ws_socket_addr) - // .await - // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - // let addr = server - // .local_addr() - // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - - // ws_local_addr = Some(addr); - // ws_server = Some(server); - // } let builder = self.ws_server_config.expect("Expected a value, but found None"); let server = builder .ws_only() @@ -1439,28 +1403,6 @@ impl RpcServerConfig { let ws_local_addr = Some(addr); let ws_server = Some(server); - // if let Some(builder) = self.http_server_config.take() { - // let server = builder - // .http_only() - // .set_http_middleware( - // tower::ServiceBuilder::new() - // .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) - // .option_layer(self.jwt_secret.map(|secret| - // AuthLayer::new(JwtAuthValidator::new(secret)))), ) - // .set_rpc_middleware( - // RpcServiceBuilder::new().layer( - // modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(), - // ), - // ) - // .build(http_socket_addr) - // .await - // .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; - // let local_addr = server - // .local_addr() - // .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; - // http_local_addr = Some(local_addr); - // http_server = Some(server); - // } let builder = self.http_server_config.expect("Expected a value, but found None"); let server = builder .http_only() From f8815585ea8ac6873909e7a43a5a3562f102144e Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:26:47 -0400 Subject: [PATCH 06/12] update --- crates/rpc/rpc-builder/src/lib.rs | 122 +++++++++++++++++++----------- 1 file changed, 76 insertions(+), 46 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 2cebe5aa73e3..f8dc1c78a7bc 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1379,52 +1379,82 @@ impl RpcServerConfig { }); } - let builder = self.ws_server_config.expect("Expected a value, but found None"); - let server = builder - .ws_only() - .set_http_middleware( - tower::ServiceBuilder::new() - .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) - .option_layer( - self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), - ), - ) - .set_rpc_middleware( - RpcServiceBuilder::new() - .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), - ) - .build(ws_socket_addr) - .await - .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - let addr = server - .local_addr() - .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - - let ws_local_addr = Some(addr); - let ws_server = Some(server); - - let builder = self.http_server_config.expect("Expected a value, but found None"); - let server = builder - .http_only() - .set_http_middleware( - tower::ServiceBuilder::new() - .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) - .option_layer( - self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), - ), - ) - .set_rpc_middleware( - RpcServiceBuilder::new() - .layer(modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default()), - ) - .build(http_socket_addr) - .await - .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; - let local_addr = server - .local_addr() - .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; - let http_local_addr = Some(local_addr); - let http_server = Some(server); + let mut ws_local_addr = None; + let mut ws_server = None; + let mut http_local_addr = None; + let mut http_server = None; + + // let builder = self.ws_server_config.expect("Expected a value, but found None"); + // let server = builder + // .ws_only() + // .set_http_middleware( + // tower::ServiceBuilder::new() + // .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) + // .option_layer( + // self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + // ), + // ) + // .set_rpc_middleware( + // RpcServiceBuilder::new() + // .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), + // ) + // .build(ws_socket_addr) + // .await + // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + // let addr = server + // .local_addr() + // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + if let Some(builder) = self.ws_server_config { + let server = builder + .ws_only() + .set_http_middleware( + tower::ServiceBuilder::new() + .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) + .option_layer( + self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + ), + ) + .set_rpc_middleware( + RpcServiceBuilder::new() + .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), + ) + .build(ws_socket_addr) + .await + .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + + let addr = server + .local_addr() + .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; + + ws_local_addr = Some(addr); + ws_server = Some(server); + } + + //let builder = self.http_server_config.expect("Expected a value, but found None"); + if let Some(builder) = self.http_server_config { + let server = builder + .http_only() + .set_http_middleware( + tower::ServiceBuilder::new() + .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) + .option_layer( + self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + ), + ) + .set_rpc_middleware( + RpcServiceBuilder::new() + .layer(modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default()), + ) + .build(http_socket_addr) + .await + .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; + let local_addr = server + .local_addr() + .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?; + http_local_addr = Some(local_addr); + http_server = Some(server); + } + let http_handle = http_server .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); From e221c8f88bb62d6677d2bfca7b131f1ad90c0d65 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:27:58 -0400 Subject: [PATCH 07/12] update --- crates/rpc/rpc-builder/src/lib.rs | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index f8dc1c78a7bc..74b46e7a0f7f 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1282,11 +1282,6 @@ impl RpcServerConfig { cors.as_deref().map(cors::create_cors_layer).transpose() } - /// Creates the [`AuthLayer`] if any - // fn maybe_jwt_layer(&self) -> Option> { - // self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) - // } - /// Builds and starts the ws and http server(s). /// /// If both are on the same port, they are combined into one server. @@ -1384,26 +1379,6 @@ impl RpcServerConfig { let mut http_local_addr = None; let mut http_server = None; - // let builder = self.ws_server_config.expect("Expected a value, but found None"); - // let server = builder - // .ws_only() - // .set_http_middleware( - // tower::ServiceBuilder::new() - // .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) - // .option_layer( - // self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), - // ), - // ) - // .set_rpc_middleware( - // RpcServiceBuilder::new() - // .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()), - // ) - // .build(ws_socket_addr) - // .await - // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - // let addr = server - // .local_addr() - // .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; if let Some(builder) = self.ws_server_config { let server = builder .ws_only() @@ -1430,7 +1405,6 @@ impl RpcServerConfig { ws_server = Some(server); } - //let builder = self.http_server_config.expect("Expected a value, but found None"); if let Some(builder) = self.http_server_config { let server = builder .http_only() @@ -1455,7 +1429,6 @@ impl RpcServerConfig { http_server = Some(server); } - let http_handle = http_server .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); let ws_handle = ws_server From 261a3f5444bea3d329f5ffe3a24dd7492a7df573 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:30:45 -0400 Subject: [PATCH 08/12] format --- crates/rpc/rpc-builder/src/lib.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 74b46e7a0f7f..f1bbbfe31538 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1386,7 +1386,8 @@ impl RpcServerConfig { tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) .option_layer( - self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + self.jwt_secret + .map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), ), ) .set_rpc_middleware( @@ -1396,11 +1397,11 @@ impl RpcServerConfig { .build(ws_socket_addr) .await .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - + let addr = server .local_addr() .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?; - + ws_local_addr = Some(addr); ws_server = Some(server); } @@ -1412,12 +1413,14 @@ impl RpcServerConfig { tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) .option_layer( - self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + self.jwt_secret + .map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), ), ) .set_rpc_middleware( - RpcServiceBuilder::new() - .layer(modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default()), + RpcServiceBuilder::new().layer( + modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(), + ), ) .build(http_socket_addr) .await From 2773aee21a90ddf3d500e433af9ec122f2d21ab6 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:47:56 -0400 Subject: [PATCH 09/12] update --- crates/rpc/rpc-builder/src/lib.rs | 94 +++++++++++++++++-------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index f1bbbfe31538..5a20145b8209 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -55,7 +55,7 @@ //! evm_config, //! ) //! .build(transports, EthApiBuild::build); -//! RpcServerConfig::default() +//! let handle = RpcServerConfig::default() //! .with_http(ServerBuilder::default()) //! .start(&transport_modules) //! .await; @@ -1282,6 +1282,16 @@ impl RpcServerConfig { cors.as_deref().map(cors::create_cors_layer).transpose() } + // /// Creates the [`AuthLayer`] if any + // fn maybe_jwt_layer(&self) -> Option> { + // self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) + // } + + /// Creates the [`AuthLayer`] if any + fn maybe_jwt_layer(jwt_secret: Option) -> Option> { + jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) + } + /// Builds and starts the ws and http server(s). /// /// If both are on the same port, they are combined into one server. @@ -1331,47 +1341,47 @@ impl RpcServerConfig { // we merge this into one server using the http setup modules.config.ensure_ws_http_identical()?; - let builder = self.http_server_config.expect("Expected a value, but found None"); - - let server = builder - .set_http_middleware( - tower::ServiceBuilder::new() - .option_layer(Self::maybe_cors_layer(cors)?) - .option_layer( - self.jwt_secret - .map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), + //let builder = self.http_server_config.expect("Expected a value, but found None"); + if let Some(builder) = self.http_server_config { + let server = builder + .set_http_middleware( + tower::ServiceBuilder::new() + .option_layer(Self::maybe_cors_layer(cors)?) + .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), + ) + .set_rpc_middleware( + RpcServiceBuilder::new().layer( + modules + .http + .as_ref() + .or(modules.ws.as_ref()) + .map(RpcRequestMetrics::same_port) + .unwrap_or_default(), ), - ) - .set_rpc_middleware( - RpcServiceBuilder::new().layer( - modules - .http - .as_ref() - .or(modules.ws.as_ref()) - .map(RpcRequestMetrics::same_port) - .unwrap_or_default(), - ), - ) - .build(http_socket_addr) - .await - .map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?; - let addr = server - .local_addr() - .map_err(|err| RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)))?; - if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) { - let handle = server.start(module.clone()); - http_handle = Some(handle.clone()); - ws_handle = Some(handle); + ) + .build(http_socket_addr) + .await + .map_err(|err| { + RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)) + })?; + let addr = server.local_addr().map_err(|err| { + RpcError::server_error(err, ServerKind::WsHttp(http_socket_addr)) + })?; + if let Some(module) = modules.http.as_ref().or(modules.ws.as_ref()) { + let handle = server.start(module.clone()); + http_handle = Some(handle.clone()); + ws_handle = Some(handle); + } + return Ok(RpcServerHandle { + http_local_addr: Some(addr), + ws_local_addr: Some(addr), + http: http_handle, + ws: ws_handle, + ipc_endpoint: self.ipc_endpoint.clone(), + ipc: ipc_handle, + jwt_secret: self.jwt_secret, + }); } - return Ok(RpcServerHandle { - http_local_addr: Some(addr), - ws_local_addr: Some(addr), - http: http_handle, - ws: ws_handle, - ipc_endpoint: self.ipc_endpoint.clone(), - ipc: ipc_handle, - jwt_secret: self.jwt_secret, - }); } let mut ws_local_addr = None; @@ -1432,9 +1442,9 @@ impl RpcServerConfig { http_server = Some(server); } - let http_handle = http_server + http_handle = http_server .map(|http_server| http_server.start(modules.http.clone().expect("http server error"))); - let ws_handle = ws_server + ws_handle = ws_server .map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error"))); Ok(RpcServerHandle { http_local_addr, From 10197f9c064d9675241a4db62537d89e555d3758 Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 14:50:18 -0400 Subject: [PATCH 10/12] update --- crates/rpc/rpc-builder/src/lib.rs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 5a20145b8209..43419f5b1d11 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1282,11 +1282,6 @@ impl RpcServerConfig { cors.as_deref().map(cors::create_cors_layer).transpose() } - // /// Creates the [`AuthLayer`] if any - // fn maybe_jwt_layer(&self) -> Option> { - // self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) - // } - /// Creates the [`AuthLayer`] if any fn maybe_jwt_layer(jwt_secret: Option) -> Option> { jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) @@ -1341,7 +1336,6 @@ impl RpcServerConfig { // we merge this into one server using the http setup modules.config.ensure_ws_http_identical()?; - //let builder = self.http_server_config.expect("Expected a value, but found None"); if let Some(builder) = self.http_server_config { let server = builder .set_http_middleware( @@ -1395,10 +1389,7 @@ impl RpcServerConfig { .set_http_middleware( tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?) - .option_layer( - self.jwt_secret - .map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), - ), + .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), ) .set_rpc_middleware( RpcServiceBuilder::new() @@ -1422,10 +1413,7 @@ impl RpcServerConfig { .set_http_middleware( tower::ServiceBuilder::new() .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?) - .option_layer( - self.jwt_secret - .map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))), - ), + .option_layer(Self::maybe_jwt_layer(self.jwt_secret)), ) .set_rpc_middleware( RpcServiceBuilder::new().layer( From fe277197051e23b6847cff1df7f7218d007b363a Mon Sep 17 00:00:00 2001 From: smatthewenglish Date: Mon, 8 Jul 2024 15:08:25 -0400 Subject: [PATCH 11/12] update --- crates/rpc/rpc-builder/src/lib.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 43419f5b1d11..200aec81aca6 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1293,6 +1293,7 @@ impl RpcServerConfig { pub async fn start(self, modules: &TransportRpcModules) -> Result { let mut http_handle = None; let mut ws_handle = None; + let mut ipc_handle = None; let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::LOCALHOST, @@ -1308,10 +1309,12 @@ impl RpcServerConfig { let ipc_path = self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into()); - let builder = self.ipc_server_config.expect("Expected a value, but found None"); - let ipc = - builder.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)).build(ipc_path); - let ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); + if let Some(builder) = self.ipc_server_config { + let ipc = builder + .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) + .build(ipc_path); + ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?); + } // If both are configured on the same port, we combine them into one server. if self.http_addr == self.ws_addr && From 3e577fb6b3ec575ff7a744788e072142f1b0d919 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 8 Jul 2024 22:53:23 +0200 Subject: [PATCH 12/12] docs: touchups --- crates/rpc/rpc-builder/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 200aec81aca6..8a6dce5ae6d7 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1287,9 +1287,11 @@ impl RpcServerConfig { jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))) } - /// Builds and starts the ws and http server(s). + /// Builds and starts the configured server(s): http, ws, ipc. /// - /// If both are on the same port, they are combined into one server. + /// If both http and ws are on the same port, they are combined into one server. + /// + /// Returns the [`RpcServerHandle`] with the handle to the started servers. pub async fn start(self, modules: &TransportRpcModules) -> Result { let mut http_handle = None; let mut ws_handle = None;