Skip to content

Commit

Permalink
add ipc start
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Jul 1, 2024
1 parent d36a463 commit bdf8031
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 77 deletions.
2 changes: 1 addition & 1 deletion crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ where
let mut server_config = config.rpc.rpc_server_config();

let cloned_modules = modules.clone();
let launch_rpc = server_config.start_ws_http(&cloned_modules).await?;
let launch_rpc = server_config.start(&cloned_modules).await?;

if let Some(path) = launch_rpc.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
Expand Down
95 changes: 29 additions & 66 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
//! transaction pool. [`RpcModuleBuilder::build`] returns a [`TransportRpcModules`] which contains
//! the transport specific config (what APIs are available via this transport).
//!
//! The [`RpcServerConfig`] is used to configure the [`RpcServer`] type which contains all transport
//! implementations (http server, ws server, ipc server). [`RpcServer::start`] requires the
//! [`TransportRpcModules`] so it can start the servers with the configured modules.
//! The [`RpcServerConfig`] is used to assemble and start the http server, ws server, ipc servers,
//! it requires the [`TransportRpcModules`] so it can start the servers with the configured modules.
//!
//! # Examples
//!
Expand Down Expand Up @@ -66,7 +65,7 @@
//! )
//! .build(transports);
//! let mut handle = RpcServerConfig::default().with_http(ServerBuilder::default());
//! handle.start_ws_http(&transport_modules).await.unwrap();
//! handle.start(&transport_modules).await.unwrap();
//! }
//! ```
//!
Expand Down Expand Up @@ -139,8 +138,7 @@
//! let mut config = RpcServerConfig::default();
//!
//! let (_rpc_handle, _auth_handle) =
//! try_join!(config.start_ws_http(&modules), auth_module.start_server(auth_config),)
//! .unwrap();
//! try_join!(config.start(&modules), auth_module.start_server(auth_config),).unwrap();
//! }
//! ```
Expand Down Expand Up @@ -168,7 +166,6 @@ use jsonrpsee::{
};
use reth_engine_primitives::EngineTypes;
use reth_evm::ConfigureEvm;
use reth_ipc::server::IpcServer;
use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
use reth_provider::{
AccountReader, BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider,
Expand All @@ -192,7 +189,6 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tower_http::cors::CorsLayer;
use tracing::{instrument, trace};

// re-export for convenience
pub use jsonrpsee::server::ServerBuilder;
Expand Down Expand Up @@ -250,10 +246,13 @@ where
EvmConfig: ConfigureEvm,
{
let module_config = module_config.into();
let mut server_config = server_config.into();
let modules = RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config)
.build(module_config);
let handle: RpcServerHandle = server_config.start_ws_http(&modules).await?;
let handle: RpcServerHandle = server_config
.into()
.start(
&RpcModuleBuilder::new(provider, pool, network, executor, events, evm_config)
.build(module_config),
)
.await?;
Ok(handle)
}

Expand Down Expand Up @@ -511,8 +510,6 @@ where

/// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can
/// be used to start the transport server(s).
///
/// See also [`RpcServer::start`]
pub fn build(self, module_config: TransportRpcModuleConfig) -> TransportRpcModules<()> {
let mut modules = TransportRpcModules::default();

Expand Down Expand Up @@ -1275,15 +1272,16 @@ impl RpcServerConfig {
self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
}

/// Builds the ws and http server(s).
/// Builds and starts the ws and http server(s).
///
/// If both are on the same port, they are combined into one server.
pub async fn start_ws_http(
pub async fn start(
&mut self,
modules: &TransportRpcModules,
) -> Result<RpcServerHandle, RpcError> {
let mut http_handle = None;
let mut ws_handle = None;
let mut ipc_handle = None;

let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
Expand All @@ -1295,6 +1293,17 @@ impl RpcServerConfig {
constants::DEFAULT_WS_RPC_PORT,
)));

let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default();
let ipc_path =
self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());

if let Some(builder) = self.ipc_server_config.take() {
let ipc = builder
.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
.build(ipc_path);
ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
}

// If both are configured on the same port, we combine them into one server.
if self.http_addr == self.ws_addr &&
self.http_server_config.is_some() &&
Expand Down Expand Up @@ -1354,8 +1363,8 @@ impl RpcServerConfig {
ws_local_addr: Some(addr),
http: http_handle,
ws: ws_handle,
ipc_endpoint: None,
ipc: None,
ipc_endpoint: self.ipc_endpoint.clone(),
ipc: ipc_handle,
jwt_secret: self.jwt_secret,
});
}
Expand Down Expand Up @@ -1419,8 +1428,8 @@ impl RpcServerConfig {
ws_local_addr,
http: http_handle,
ws: ws_handle,
ipc_endpoint: None,
ipc: None,
ipc_endpoint: self.ipc_endpoint.clone(),
ipc: ipc_handle,
jwt_secret: self.jwt_secret,
})
}
Expand Down Expand Up @@ -1632,52 +1641,6 @@ impl TransportRpcModules {
}
}

/// Container type for ipc server
#[allow(missing_debug_implementations)]
pub struct RpcServer {
/// ipc server
ipc: Option<IpcServer<Identity, Stack<RpcRequestMetrics, Identity>>>,
}

// === impl RpcServer ===

impl RpcServer {
/// Returns the endpoint of the ipc server if started.
pub fn ipc_endpoint(&self) -> Option<String> {
self.ipc.as_ref().map(|ipc| ipc.endpoint())
}

/// Starts the configured server by spawning the servers on the tokio runtime.
///
/// This returns an [RpcServerHandle] that's connected to the server task(s) until the server is
/// stopped or the [RpcServerHandle] is dropped.
#[instrument(name = "start", skip_all, fields(ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")]
#[allow(dead_code, unused_variables)]
pub async fn start(self, modules: TransportRpcModules) -> Result<RpcServerHandle, RpcError> {
trace!(target: "rpc", "staring RPC server");
let Self { ipc: ipc_server } = self;
let TransportRpcModules { config, http, ws, ipc } = modules;
let mut handle = RpcServerHandle {
http_local_addr: None,
ws_local_addr: None,
http: None,
ws: None,
ipc_endpoint: None,
ipc: None,
jwt_secret: None,
};

if let Some((server, module)) =
ipc_server.and_then(|server| ipc.map(|module| (server, module)))
{
handle.ipc_endpoint = Some(server.endpoint());
handle.ipc = Some(server.start(module).await?);
}

Ok(handle)
}
}

/// A handle to the spawned servers.
///
/// When this type is dropped or [`RpcServerHandle::stop`] has been called the server will be
Expand Down
10 changes: 5 additions & 5 deletions crates/rpc/rpc-builder/tests/it/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn test_http_addr_in_use() {
let builder = test_rpc_builder();
let server = builder.build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]));
let mut config = RpcServerConfig::http(Default::default()).with_http_address(addr);
let result = config.start_ws_http(&server).await;
let result = config.start(&server).await;
let err = result.unwrap_err();
assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}");
}
Expand All @@ -38,7 +38,7 @@ async fn test_ws_addr_in_use() {
let builder = test_rpc_builder();
let server = builder.build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]));
let mut config = RpcServerConfig::ws(Default::default()).with_ws_address(addr);
let result = config.start_ws_http(&server).await;
let result = config.start(&server).await;
let err = result.unwrap_err();
assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}");
}
Expand All @@ -63,7 +63,7 @@ async fn test_launch_same_port_different_modules() {
.with_ws_address(addr)
.with_http(Default::default())
.with_http_address(addr);
let res = config.start_ws_http(&server).await;
let res = config.start(&server).await;
let err = res.unwrap_err();
assert!(matches!(
err,
Expand All @@ -85,7 +85,7 @@ async fn test_launch_same_port_same_cors() {
.with_cors(Some("*".to_string()))
.with_http_cors(Some("*".to_string()))
.with_http_address(addr);
let res = config.start_ws_http(&server).await;
let res = config.start(&server).await;
assert!(res.is_ok());
}

Expand All @@ -103,7 +103,7 @@ async fn test_launch_same_port_different_cors() {
.with_cors(Some("*".to_string()))
.with_http_cors(Some("example".to_string()))
.with_http_address(addr);
let res = config.start_ws_http(&server).await;
let res = config.start(&server).await;
let err = res.unwrap_err();
assert!(matches!(
err,
Expand Down
8 changes: 4 additions & 4 deletions crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ pub async fn launch_http(modules: impl Into<RpcModuleSelection>) -> RpcServerHan
let builder = test_rpc_builder();
let server = builder.build(TransportRpcModuleConfig::set_http(modules));
let mut config = RpcServerConfig::http(Default::default()).with_http_address(test_address());
config.start_ws_http(&server).await.unwrap()
config.start(&server).await.unwrap()
}

/// Launches a new server with ws only with the given modules
pub async fn launch_ws(modules: impl Into<RpcModuleSelection>) -> RpcServerHandle {
let builder = test_rpc_builder();
let server = builder.build(TransportRpcModuleConfig::set_ws(modules));
let mut config = RpcServerConfig::ws(Default::default()).with_http_address(test_address());
config.start_ws_http(&server).await.unwrap()
config.start(&server).await.unwrap()
}

/// Launches a new server with http and ws and with the given modules
Expand All @@ -74,7 +74,7 @@ pub async fn launch_http_ws(modules: impl Into<RpcModuleSelection>) -> RpcServer
.with_ws_address(test_address())
.with_http(Default::default())
.with_http_address(test_address());
config.start_ws_http(&server).await.unwrap()
config.start(&server).await.unwrap()
}

/// Launches a new server with http and ws and with the given modules on the same port.
Expand All @@ -88,7 +88,7 @@ pub async fn launch_http_ws_same_port(modules: impl Into<RpcModuleSelection>) ->
.with_ws_address(addr)
.with_http(Default::default())
.with_http_address(addr);
config.start_ws_http(&server).await.unwrap()
config.start(&server).await.unwrap()
}

/// Returns an [`RpcModuleBuilder`] with testing components.
Expand Down
2 changes: 1 addition & 1 deletion examples/rpc-db/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> eyre::Result<()> {
// Start the server & keep it alive
let mut server_args =
RpcServerConfig::http(Default::default()).with_http_address("0.0.0.0:8545".parse()?);
let _handle = server_args.start_ws_http(&server).await?;
let _handle = server_args.start(&server).await?;
futures::future::pending::<()>().await;

Ok(())
Expand Down

0 comments on commit bdf8031

Please sign in to comment.