Skip to content

Commit

Permalink
add API to use custom tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Oct 11, 2021
1 parent f9dec57 commit ca77d88
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
23 changes: 21 additions & 2 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct Builder {
resources: Resources,
max_request_body_size: u32,
keep_alive: bool,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl Builder {
Expand Down Expand Up @@ -90,6 +92,13 @@ impl Builder {
Ok(self)
}

/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
///
/// Default: [`tokio::spawn`]
pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) {
self.tokio_runtime = Some(rt);
}

/// Finalizes the configuration of the server.
pub fn build(self, addr: SocketAddr) -> Result<Server, Error> {
let domain = Domain::for_address(addr);
Expand All @@ -113,6 +122,7 @@ impl Builder {
access_control: self.access_control,
max_request_body_size: self.max_request_body_size,
resources: self.resources,
tokio_runtime: self.tokio_runtime,
})
}
}
Expand All @@ -124,6 +134,7 @@ impl Default for Builder {
resources: Resources::default(),
access_control: AccessControl::default(),
keep_alive: true,
tokio_runtime: None,
}
}
}
Expand Down Expand Up @@ -159,6 +170,8 @@ pub struct Server {
access_control: AccessControl,
/// Tracker for currently used resources on the server
resources: Resources,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl Server {
Expand All @@ -168,7 +181,7 @@ impl Server {
}

/// Start the server.
pub fn start(self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
pub fn start(mut self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
let max_request_body_size = self.max_request_body_size;
let access_control = self.access_control;
let (tx, mut rx) = mpsc::channel(1);
Expand Down Expand Up @@ -271,10 +284,16 @@ impl Server {
}
});

let handle = tokio::spawn(async move {
let rt = match self.tokio_runtime.take() {
Some(rt) => rt,
None => tokio::runtime::Handle::current(),
};

let handle = rt.spawn(async move {
let server = listener.serve(make_service);
let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
});

Ok(StopHandle { stop_handle: Some(handle), stop_sender: tx })
}
}
Expand Down
19 changes: 16 additions & 3 deletions ws-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ impl Server {
self.stop_monitor.handle()
}

/// Start responding to connections requests. This will block current thread until the server is stopped.
pub fn start(self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
/// Start responding to connections requests. This will run on the tokio runtime until the server is stopped.
pub fn start(mut self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
let methods = methods.into().initialize_resources(&self.resources)?;
let handle = self.stop_handle();

tokio::spawn(self.start_inner(methods));
match self.cfg.tokio_runtime.take() {
Some(rt) => rt.spawn(self.start_inner(methods)),
None => tokio::spawn(self.start_inner(methods)),
};

Ok(handle)
}
Expand Down Expand Up @@ -363,6 +366,8 @@ struct Settings {
allowed_origins: AllowedValue,
/// Policy by which to accept or deny incoming requests based on the `Host` header.
allowed_hosts: AllowedValue,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}

impl Default for Settings {
Expand All @@ -372,6 +377,7 @@ impl Default for Settings {
max_connections: MAX_CONNECTIONS,
allowed_origins: AllowedValue::Any,
allowed_hosts: AllowedValue::Any,
tokio_runtime: None,
}
}
}
Expand Down Expand Up @@ -476,6 +482,13 @@ impl Builder {
self
}

/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
///
/// Default: [`tokio::spawn`]
pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) {
self.settings.tokio_runtime = Some(rt);
}

/// Finalize the configuration of the server. Consumes the [`Builder`].
pub async fn build(self, addr: impl ToSocketAddrs) -> Result<Server, Error> {
let listener = TcpListener::bind(addr).await?;
Expand Down

0 comments on commit ca77d88

Please sign in to comment.