Skip to content

Commit

Permalink
use futures oneshot
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Oct 11, 2021
1 parent ca77d88 commit 0432f6b
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{response, AccessControl};
use futures_channel::mpsc;
use futures_channel::{mpsc, oneshot};
use futures_util::future::join_all;
use futures_util::stream::StreamExt;
use hyper::{
Expand Down Expand Up @@ -142,14 +142,18 @@ impl Default for Builder {
/// Handle used to stop the running server.
#[derive(Debug)]
pub struct StopHandle {
stop_sender: mpsc::Sender<()>,
stop_sender: oneshot::Sender<()>,
stop_handle: Option<tokio::task::JoinHandle<()>>,
}

impl StopHandle {
/// Requests server to stop. Returns an error if server was already stopped.
pub fn stop(mut self) -> Result<tokio::task::JoinHandle<()>, Error> {
let stop = self.stop_sender.try_send(()).and_then(|_| Ok(self.stop_handle.take()));
///
/// Returns a future that can be awaited for when the server shuts down.
pub fn stop(self) -> Result<tokio::task::JoinHandle<()>, Error> {
let sender = self.stop_sender;
let mut handle = self.stop_handle;
let stop = sender.send(()).and_then(|_| Ok(handle.take()));
match stop {
Ok(Some(handle)) => Ok(handle),
_ => Err(Error::AlreadyStopped),
Expand Down Expand Up @@ -184,7 +188,7 @@ impl Server {
pub fn start(mut self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
let max_request_body_size = self.max_request_body_size;
let access_control = self.access_control;
let (tx, mut rx) = mpsc::channel(1);
let (tx, rx) = oneshot::channel();
let listener = self.listener;
let resources = self.resources;
let methods = methods.into().initialize_resources(&resources)?;
Expand Down Expand Up @@ -291,7 +295,9 @@ impl Server {

let handle = rt.spawn(async move {
let server = listener.serve(make_service);
let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
server.with_graceful_shutdown(async move {
rx.await.ok();
});
});

Ok(StopHandle { stop_handle: Some(handle), stop_sender: tx })
Expand Down

0 comments on commit 0432f6b

Please sign in to comment.