Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdown grace period #390

Merged
merged 1 commit into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/controlchan/control_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ where
match incoming {
None => {} // Loop again
Some(Ok(Event::InternalMsg(ControlChanMsg::Quit))) => {
slog::info!(logger, "Upgrading control channel to TLS");
slog::info!(logger, "Exiting control loop");
return;
}
Some(Ok(event)) => {
Expand Down
34 changes: 24 additions & 10 deletions src/server/ftpserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod options;

use super::{
controlchan,
ftpserver::{error::ServerError, options::FtpsRequired, options::SiteMd5},
ftpserver::{error::ServerError, error::ShutdownError, options::FtpsRequired, options::SiteMd5},
shutdown,
tls::FtpsConfig,
};
Expand All @@ -20,6 +20,7 @@ use crate::{
storage::{Metadata, StorageBackend},
};

use crate::server::shutdown::Notifier;
use options::{PassiveHost, DEFAULT_GREETING, DEFAULT_IDLE_SESSION_TIMEOUT_SECS};
use slog::*;
use std::{fmt::Debug, future::Future, net::SocketAddr, ops::Range, path::PathBuf, pin::Pin, sync::Arc, time::Duration};
Expand Down Expand Up @@ -67,7 +68,7 @@ where
proxy_protocol_mode: ProxyMode,
logger: slog::Logger,
site_md5: SiteMd5,
shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
shutdown: Option<Pin<Box<dyn Future<Output = Duration> + Send + Sync>>>,
}

impl<Storage, User> Server<Storage, User>
Expand Down Expand Up @@ -403,7 +404,7 @@ where
/// Allows telling libunftp when to shutdown gracefully
///
/// The passed argument may contain a future that should resolve when libunftp should shut down.
pub fn shutdown_indicator<T: Future<Output = ()> + Send + Sync + 'static>(mut self, indicator: options::Shutdown<T>) -> Self {
pub fn shutdown_indicator<T: Future<Output = Duration> + Send + Sync + 'static>(mut self, indicator: options::Shutdown<T>) -> Self {
match indicator {
options::Shutdown::None => self.shutdown = Some(Box::pin(futures_util::future::pending())),
options::Shutdown::GracefulAcceptingConnections(signal) => self.shutdown = Some(Box::pin(signal)),
Expand Down Expand Up @@ -490,14 +491,27 @@ where

tokio::select! {
result = listen_future => result,
_ = shutdown => {
slog::info!(logger, "Shutting Down");
shutdown_notifier.notify().await;
shutdown_notifier.linger().await;
// TODO: Implement feature where we keep on listening for a while i.e. GracefulAcceptingConnections
Ok(())
}
grace_period = shutdown => {
slog::debug!(logger, "Shutting down within {:?}", grace_period);
shutdown_notifier.notify().await;
Self::shutdown_linger(logger, shutdown_notifier, grace_period).await
}
}
}

// Waits for sub-components to shut down gracefully or aborts if the grace period expires
async fn shutdown_linger(logger: slog::Logger, shutdown_notifier: Arc<Notifier>, grace_period: Duration) -> std::result::Result<(), ServerError> {
let timeout = Box::pin(tokio::time::sleep(grace_period));
tokio::select! {
_ = shutdown_notifier.linger() => {
slog::debug!(logger, "Graceful shutdown complete");
Ok(())
},
_ = timeout => {
Err(ShutdownError{ msg: "shutdown grace period expired".to_string()}.into())
}
}
// TODO: Implement feature where we keep on listening for a while i.e. GracefulAcceptingConnections
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/server/ftpserver/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ impl From<std::io::Error> for ServerError {
ServerError::new("io error", e)
}
}

#[derive(Error, Debug)]
#[error("shutdown error: {msg}")]
pub struct ShutdownError {
pub msg: String,
}

impl From<ShutdownError> for ServerError {
fn from(e: ShutdownError) -> Self {
ServerError::new("shutdown error", e)
}
}
5 changes: 3 additions & 2 deletions src/server/ftpserver/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains code pertaining to the setup options that can be given to the [`Server`](crate::Server)

use bitflags::bitflags;
use std::time::Duration;
use std::{
fmt::Formatter,
fmt::{self, Debug, Display},
Expand Down Expand Up @@ -177,7 +178,7 @@ impl Default for SiteMd5 {

/// The options for [Server.shutdown_indicator](crate::Server::shutdown_indicator) that allows users
/// to specify the way in which a (graceful) shutdown of libunftp should happen.
pub enum Shutdown<Signal: Future<Output = ()> + Send + Sync> {
pub enum Shutdown<Signal: Future<Output = Duration> + Send + Sync> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really happy with the interface here, I may change it still.

current usage:

    let mut server = Server::with_authenticator(storage_backend, Arc::new(authenticator))
        .greeting("Welcome to unFTP")
        .passive_ports(start_port..end_port)
        .idle_session_timeout(idle_timeout)
        .logger(root_log.new(o!("lib" => "libunftp")))
        .passive_host(passive_host)
        .sitemd5(md5_setting)
        .shutdown_indicator(options::Shutdown::GracefulAcceptingConnections( async move {
            shutdown.recv().await.ok();
            info!(l, "Shutting down FTP server");
            Duration::from_secs(10)
        }))
        .metrics();

/// No shutdown signal will be adhered to.
///
/// This will cause libunftp to keep on running as long as the future returned
Expand All @@ -199,7 +200,7 @@ pub enum Shutdown<Signal: Future<Output = ()> + Send + Sync> {
GracefulBlockingConnections(Signal),
}

impl<Signal: Future<Output = ()> + Send + Sync> Default for Shutdown<Signal> {
impl<Signal: Future<Output = Duration> + Send + Sync> Default for Shutdown<Signal> {
fn default() -> Self {
Shutdown::None
}
Expand Down