Skip to content

Commit

Permalink
fix according to review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Jul 8, 2024
1 parent b5b129b commit 96b3aea
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 86 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,4 @@ proptest-derive = "0.5"
serial_test = "3"
similar-asserts = "1.5.0"
test-fuzz = "5"
iai-callgrind = "0.11.1"
iai-callgrind = "0.11"
2 changes: 1 addition & 1 deletion crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ where

extend_rpc_modules.extend_rpc_modules(ctx)?;

let mut server_config = config.rpc.rpc_server_config();
let server_config = config.rpc.rpc_server_config();
let cloned_modules = modules.clone();
let launch_rpc = server_config.start(&cloned_modules).await?;

Expand Down
189 changes: 121 additions & 68 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,20 +1281,16 @@ impl RpcServerConfig {
}

/// Creates the [`AuthLayer`] if any
fn maybe_jwt_layer(&self) -> Option<AuthLayer<JwtAuthValidator>> {
self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
}
// fn maybe_jwt_layer(&self) -> Option<AuthLayer<JwtAuthValidator>> {
// self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
// }

/// Builds and starts the ws and http server(s).
///
/// If both are on the same port, they are combined into one server.
pub async fn start(
self,
modules: &TransportRpcModules,
) -> Result<RpcServerHandle, RpcError> {
pub async fn start(self, modules: &TransportRpcModules) -> Result<RpcServerHandle, RpcError> {
let mut http_handle = None;
let mut ws_handle = None;
let mut ipc_handle = None;

let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
Expand All @@ -1310,12 +1306,16 @@ impl RpcServerConfig {
let ipc_path =
self.ipc_endpoint.clone().unwrap_or_else(|| constants::DEFAULT_IPC_ENDPOINT.into());

if let Some(builder) = self.ipc_server_config.take() {
let ipc = builder
.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
.build(ipc_path);
ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
}
// if let Some(builder) = self.ipc_server_config.take() {
// let ipc = builder
// .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics))
// .build(ipc_path);
// ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);
// }
let builder = self.ipc_server_config.expect("Expected a value, but found None");
let ipc =
builder.set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)).build(ipc_path);
let ipc_handle = Some(ipc.start(modules.ipc.clone().expect("ipc server error")).await?);

// If both are configured on the same port, we combine them into one server.
if self.http_addr == self.ws_addr &&
Expand All @@ -1338,16 +1338,21 @@ impl RpcServerConfig {
.cloned();

// we merge this into one server using the http setup
self.ws_server_config.take();
//self.ws_server_config.take();

modules.config.ensure_ws_http_identical()?;

let builder = self.http_server_config.take().expect("http_server_config is Some");
//let builder = self.http_server_config.take().expect("http_server_config is Some");
let builder = self.http_server_config.expect("Expected a value, but found None");

let server = builder
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(cors)?)
.option_layer(self.maybe_jwt_layer()),
.option_layer(
self.jwt_secret
.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))),
),
)
.set_rpc_middleware(
RpcServiceBuilder::new().layer(
Expand Down Expand Up @@ -1381,59 +1386,107 @@ impl RpcServerConfig {
});
}

let mut http_local_addr = None;
let mut http_server = None;

let mut ws_local_addr = None;
let mut ws_server = None;
if let Some(builder) = self.ws_server_config.take() {
let server = builder
.ws_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
.option_layer(self.maybe_jwt_layer()),
)
.set_rpc_middleware(
RpcServiceBuilder::new()
.layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()),
)
.build(ws_socket_addr)
.await
.map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
let addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;

ws_local_addr = Some(addr);
ws_server = Some(server);
}

if let Some(builder) = self.http_server_config.take() {
let server = builder
.http_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
.option_layer(self.maybe_jwt_layer()),
)
.set_rpc_middleware(
RpcServiceBuilder::new().layer(
modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
//let mut http_local_addr = None;
//let mut http_server = None;

//let mut ws_local_addr = None;
//let mut ws_server = None;

// if let Some(builder) = self.ws_server_config.take() {
// let server = builder
// .ws_only()
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
// .option_layer(self.jwt_secret.map(|secret|
// AuthLayer::new(JwtAuthValidator::new(secret)))), )
// .set_rpc_middleware(
// RpcServiceBuilder::new()
//
// .layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()),
// )
// .build(ws_socket_addr)
// .await
// .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
// let addr = server
// .local_addr()
// .map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;

// ws_local_addr = Some(addr);
// ws_server = Some(server);
// }
let builder = self.ws_server_config.expect("Expected a value, but found None");
let server = builder
.ws_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
.option_layer(
self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))),
),
)
.build(http_socket_addr)
.await
.map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
let local_addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
http_local_addr = Some(local_addr);
http_server = Some(server);
}
http_handle = http_server
)
.set_rpc_middleware(
RpcServiceBuilder::new()
.layer(modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default()),
)
.build(ws_socket_addr)
.await
.map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;
let addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::WS(ws_socket_addr)))?;

let ws_local_addr = Some(addr);
let ws_server = Some(server);

// if let Some(builder) = self.http_server_config.take() {
// let server = builder
// .http_only()
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
// .option_layer(self.jwt_secret.map(|secret|
// AuthLayer::new(JwtAuthValidator::new(secret)))), )
// .set_rpc_middleware(
// RpcServiceBuilder::new().layer(
// modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(),
// ),
// )
// .build(http_socket_addr)
// .await
// .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
// let local_addr = server
// .local_addr()
// .map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
// http_local_addr = Some(local_addr);
// http_server = Some(server);
// }
let builder = self.http_server_config.expect("Expected a value, but found None");
let server = builder
.http_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
.option_layer(
self.jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret))),
),
)
.set_rpc_middleware(
RpcServiceBuilder::new()
.layer(modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default()),
)
.build(http_socket_addr)
.await
.map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
let local_addr = server
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::Http(http_socket_addr)))?;
let http_local_addr = Some(local_addr);
let http_server = Some(server);

let http_handle = http_server
.map(|http_server| http_server.start(modules.http.clone().expect("http server error")));
ws_handle = ws_server
let ws_handle = ws_server
.map(|ws_server| ws_server.start(modules.ws.clone().expect("ws server error")));
Ok(RpcServerHandle {
http_local_addr,
Expand Down
10 changes: 5 additions & 5 deletions crates/rpc/rpc-builder/tests/it/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn test_http_addr_in_use() {
let builder = test_rpc_builder();
let server = builder
.build(TransportRpcModuleConfig::set_http(vec![RethRpcModule::Admin]), EthApiBuild::build);
let mut config = RpcServerConfig::http(Default::default()).with_http_address(addr);
let config = RpcServerConfig::http(Default::default()).with_http_address(addr);
let result = config.start(&server).await;
let err = result.unwrap_err();
assert!(is_addr_in_use_kind(&err, ServerKind::Http(addr)), "{err}");
Expand All @@ -41,7 +41,7 @@ async fn test_ws_addr_in_use() {
let builder = test_rpc_builder();
let server = builder
.build(TransportRpcModuleConfig::set_ws(vec![RethRpcModule::Admin]), EthApiBuild::build);
let mut config = RpcServerConfig::ws(Default::default()).with_ws_address(addr);
let config = RpcServerConfig::ws(Default::default()).with_ws_address(addr);
let result = config.start(&server).await;
let err = result.unwrap_err();
assert!(is_addr_in_use_kind(&err, ServerKind::WS(addr)), "{err}");
Expand All @@ -64,7 +64,7 @@ async fn test_launch_same_port_different_modules() {
EthApiBuild::build,
);
let addr = test_address();
let mut config = RpcServerConfig::ws(Default::default())
let config = RpcServerConfig::ws(Default::default())
.with_ws_address(addr)
.with_http(Default::default())
.with_http_address(addr);
Expand All @@ -85,7 +85,7 @@ async fn test_launch_same_port_same_cors() {
EthApiBuild::build,
);
let addr = test_address();
let mut config = RpcServerConfig::ws(Default::default())
let config = RpcServerConfig::ws(Default::default())
.with_ws_address(addr)
.with_http(Default::default())
.with_cors(Some("*".to_string()))
Expand All @@ -104,7 +104,7 @@ async fn test_launch_same_port_different_cors() {
EthApiBuild::build,
);
let addr = test_address();
let mut config = RpcServerConfig::ws(Default::default())
let config = RpcServerConfig::ws(Default::default())
.with_ws_address(addr)
.with_http(Default::default())
.with_cors(Some("*".to_string()))
Expand Down
30 changes: 20 additions & 10 deletions crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,22 @@ pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
pub async fn launch_http(modules: impl Into<RpcModuleSelection>) -> RpcServerHandle {
let builder = test_rpc_builder();
let server = builder.build(TransportRpcModuleConfig::set_http(modules), EthApiBuild::build);
let mut config = RpcServerConfig::http(Default::default()).with_http_address(test_address());
config.start(&server).await.unwrap()
RpcServerConfig::http(Default::default())
.with_http_address(test_address())
.start(&server)
.await
.unwrap()
}

/// Launches a new server with ws only with the given modules
pub async fn launch_ws(modules: impl Into<RpcModuleSelection>) -> RpcServerHandle {
let builder = test_rpc_builder();
let server = builder.build(TransportRpcModuleConfig::set_ws(modules), EthApiBuild::build);
let mut config = RpcServerConfig::ws(Default::default()).with_http_address(test_address());
config.start(&server).await.unwrap()
RpcServerConfig::ws(Default::default())
.with_http_address(test_address())
.start(&server)
.await
.unwrap()
}

/// Launches a new server with http and ws and with the given modules
Expand All @@ -73,11 +79,13 @@ pub async fn launch_http_ws(modules: impl Into<RpcModuleSelection>) -> RpcServer
TransportRpcModuleConfig::set_ws(modules.clone()).with_http(modules),
EthApiBuild::build,
);
let mut config = RpcServerConfig::ws(Default::default())
RpcServerConfig::ws(Default::default())
.with_ws_address(test_address())
.with_http(Default::default())
.with_http_address(test_address());
config.start(&server).await.unwrap()
.with_http_address(test_address())
.start(&server)
.await
.unwrap()
}

/// Launches a new server with http and ws and with the given modules on the same port.
Expand All @@ -89,11 +97,13 @@ pub async fn launch_http_ws_same_port(modules: impl Into<RpcModuleSelection>) ->
EthApiBuild::build,
);
let addr = test_address();
let mut config = RpcServerConfig::ws(Default::default())
RpcServerConfig::ws(Default::default())
.with_ws_address(addr)
.with_http(Default::default())
.with_http_address(addr);
config.start(&server).await.unwrap()
.with_http_address(addr)
.start(&server)
.await
.unwrap()
}

/// Returns an [`RpcModuleBuilder`] with testing components.
Expand Down
2 changes: 1 addition & 1 deletion examples/rpc-db/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn main() -> eyre::Result<()> {
server.merge_configured(custom_rpc.into_rpc())?;

// Start the server & keep it alive
let mut server_args =
let server_args =
RpcServerConfig::http(Default::default()).with_http_address("0.0.0.0:8545".parse()?);
let _handle = server_args.start(&server).await?;
futures::future::pending::<()>().await;
Expand Down

0 comments on commit 96b3aea

Please sign in to comment.