Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API to allow listening for file events #394

Merged
merged 1 commit into from
Dec 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions crates/unftp-sbe-fs/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use async_ftp::{types::Result, FtpStream};
use libunftp::options::FtpsRequired;
use pretty_assertions::assert_eq;
use std::fmt::Debug;
use std::fs;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::{str, time::Duration};
use unftp_sbe_fs::ServerExt;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
//! ```
pub mod auth;
pub(crate) mod metrics;
pub mod notification;
pub(crate) mod server;
pub mod storage;

Expand Down
4 changes: 2 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ fn add_event_metric(event: &Event) {
add_command_metric(cmd);
}
Event::InternalMsg(msg) => match msg {
ControlChanMsg::SendData { bytes } => {
ControlChanMsg::SentData { bytes, .. } => {
FTP_BACKEND_READ_BYTES.inc_by(*bytes);
FTP_BACKEND_READ_FILES.inc();
}
ControlChanMsg::WrittenData { bytes } => {
ControlChanMsg::WrittenData { bytes, .. } => {
FTP_BACKEND_WRITE_BYTES.inc_by(*bytes);
FTP_BACKEND_WRITE_FILES.inc();
}
Expand Down
118 changes: 118 additions & 0 deletions src/notification/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use async_trait::async_trait;
use std::fmt::Debug;
use std::sync::Arc;

/// An event pertaining to a client's login and logout actions in order to allow detection of the
/// presence of a client. Instances of these will be passed to an [`PresenceListener`](crate::notification::PresenceListener).
/// To identify the corresponding user or session see the [`EventMeta`](crate::notification::EventMeta) struct.
#[derive(Debug, Clone)]
pub enum PresenceEvent {
/// The user logged in successfully
LoggedIn,
/// The user logged out
LoggedOut,
}

/// An event signalling a change in data on the storage back-end. To identify the corresponding user
/// or session see the [`EventMeta`](crate::notification::EventMeta) struct.
#[derive(Debug, Clone)]
pub enum DataEvent {
/// A RETR command finished successfully
Got {
/// The path to the file that was obtained
path: String,

/// The amount of bytes transferred to the client
bytes: u64,
},
/// A STOR command finished successfully
Put {
/// The path to the file that was obtained
path: String,

/// The amount of bytes stored
bytes: u64,
},
/// A DEL command finished successfully
Deleted {
/// The path to the file that was deleted.
path: String,
},
/// A MKD command finished successfully
MadeDir {
/// The path to the directory that was created
path: String,
},
/// A RMD command finished successfully
RemovedDir {
/// The path to the directory that was removed
path: String,
},
/// A RNFR & RNTO command sequence finished successfully. This can be for a file or a directory.
Renamed {
/// The original path
from: String,
/// The new path
to: String,
},
}

/// Metadata relating to an event that can be used to to identify the user and session. A sequence
/// number is also included to allow ordering in systems where event ordering is not guaranteed.
#[derive(Debug, Clone)]
pub struct EventMeta {
/// The user this event pertains to. A user may have more than one connection or session.
pub username: String,
/// Identifies a single session pertaining to a connected client.
pub trace_id: String,
/// The event sequence number as incremented per session.
pub sequence_number: u64,
}

/// An listener for [`DataEvent`](crate::notification::DataEvent)s. Implementations can
/// be passed to [`Server::notify_data`](crate::Server::notify_data)
/// in order to receive notifications.
#[async_trait]
pub trait DataListener: Sync + Send + Debug {
/// Called after the event happened. Event metadata is also passed to allow pinpointing the user
/// session for which it happened.
async fn receive_data_event(&self, e: DataEvent, m: EventMeta);
}

/// An listener for [`PresenceEvent`](crate::notification::PresenceEvent)s. Implementations can
/// be passed to [`Server::notify_presence`](crate::Server::notify_presence)
/// in order to receive notifications.
#[async_trait]
pub trait PresenceListener: Sync + Send + Debug {
/// Called after the event happened. Event metadata is also passed to allow pinpointing the user
/// session for which it happened.
async fn receive_presence_event(&self, e: PresenceEvent, m: EventMeta);
}

#[async_trait]
impl DataListener for Box<dyn DataListener> {
async fn receive_data_event(&self, e: DataEvent, m: EventMeta) {
self.as_ref().receive_data_event(e, m).await
}
}

#[async_trait]
impl PresenceListener for Box<dyn PresenceListener> {
async fn receive_presence_event(&self, e: PresenceEvent, m: EventMeta) {
self.as_ref().receive_presence_event(e, m).await
}
}

#[async_trait]
impl DataListener for Arc<dyn DataListener> {
async fn receive_data_event(&self, e: DataEvent, m: EventMeta) {
self.as_ref().receive_data_event(e, m).await
}
}

#[async_trait]
impl PresenceListener for Arc<dyn PresenceListener> {
async fn receive_presence_event(&self, e: PresenceEvent, m: EventMeta) {
self.as_ref().receive_presence_event(e, m).await
}
}
17 changes: 17 additions & 0 deletions src/notification/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#![deny(missing_docs)]
//!
//! Allows users to listen to events emitted by libunftp.
//!
//! To listen for changes in data implement the [`DataListener`](crate::notification::DataListener)
//! trait and use the [`Server::notify_data`](crate::Server::notify_data) method
//! to make libunftp notify it.
//!
//! To listen to logins and logouts implement the [`PresenceListener`](crate::notification::PresenceListener)
//! trait and use the [`Server::notify_presence`](crate::Server::notify_data) method
//! to make libunftp use it.
//!

pub(crate) mod event;
pub(crate) mod nop;

pub use event::{DataEvent, DataListener, EventMeta, PresenceEvent, PresenceListener};
17 changes: 17 additions & 0 deletions src/notification/nop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::notification::event::{DataEvent, DataListener, EventMeta, PresenceEvent, PresenceListener};

use async_trait::async_trait;

// An event listener that does nothing. Used as a default Null Object in [`Server`](crate::Server).
#[derive(Debug)]
pub struct NopListener {}

#[async_trait]
impl DataListener for NopListener {
async fn receive_data_event(&self, _: DataEvent, _: EventMeta) {}
}

#[async_trait]
impl PresenceListener for NopListener {
async fn receive_presence_event(&self, _: PresenceEvent, _: EventMeta) {}
}
34 changes: 27 additions & 7 deletions src/server/chancomms.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains code pertaining to the communication between the data and control channels.

use super::session::SharedSession;
use crate::server::session::TraceId;
use crate::{
auth::UserDetail,
server::controlchan::Reply,
Expand Down Expand Up @@ -45,13 +46,17 @@ pub enum ControlChanMsg {
PermissionDenied,
/// File not found
NotFound,
/// Send the data to the client
SendData {
/// Data was successfully sent to the client during a GET
SentData {
/// The path as specified by the client
path: String,
/// The number of bytes transferred
bytes: u64,
},
/// We've written the data from the client to the StorageBackend
WrittenData {
/// The path as specified by the client
path: String,
/// The number of bytes transferred
bytes: u64,
},
Expand All @@ -70,24 +75,39 @@ pub enum ControlChanMsg {
/// Successfully cwd
CwdSuccess,
/// File successfully deleted
DelSuccess,
DelFileSuccess {
/// The path as specified by the client
path: String,
},
/// File successfully deleted
RmDirSuccess {
/// The path as specified by the client
path: String,
},
/// File successfully deleted
RenameSuccess {
/// The old path as specified by the client
old_path: String,
/// The new path as specified by the client
new_path: String,
},
/// Failed to delete file
DelFail,
/// Quit the client connection
Quit,
ExitControlLoop,
/// Successfully created directory
MkdirSuccess(std::path::PathBuf),
MkDirSuccess { path: String },
/// Failed to crate directory
MkdirFail,
/// Authentication successful
AuthSuccess,
AuthSuccess { username: String, trace_id: TraceId },
/// Authentication failed
AuthFailed,
/// Sent to switch the control channel to TLS/SSL mode.
SecureControlChannel,
/// Sent to switch the control channel from TLS/SSL mode back to plaintext.
PlaintextControlChannel,
/// Errors comming from the storage
/// Errors coming from the storage backend
StorageError(Error),
/// Reply on the command channel
CommandChannelReply(Reply),
Expand Down
3 changes: 2 additions & 1 deletion src/server/controlchan/commands/dele.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ where
let storage = Arc::clone(&session.storage);
let user = session.user.clone();
let path = session.cwd.join(self.path.clone());
let path_str = path.to_string_lossy().to_string();
let tx_success: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let tx_fail: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let logger = args.logger;
tokio::spawn(async move {
match storage.del((*user).as_ref().unwrap(), path).await {
Ok(_) => {
if let Err(err) = tx_success.send(ControlChanMsg::DelSuccess).await {
if let Err(err) = tx_success.send(ControlChanMsg::DelFileSuccess { path: path_str }).await {
slog::warn!(logger, "{}", err);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/controlchan/commands/mkd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ where
let user = session.user.clone();
let storage = Arc::clone(&session.storage);
let path: PathBuf = session.cwd.join(self.path.clone());
let tx_success: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let tx_fail: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let path_str = path.to_string_lossy().to_string();
let tx: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let logger = args.logger;
tokio::spawn(async move {
if let Err(err) = storage.mkd((*user).as_ref().unwrap(), &path).await {
if let Err(err) = tx_fail.send(ControlChanMsg::StorageError(err)).await {
if let Err(err) = tx.send(ControlChanMsg::StorageError(err)).await {
slog::warn!(logger, "{}", err);
}
} else if let Err(err) = tx_success.send(ControlChanMsg::MkdirSuccess(path)).await {
} else if let Err(err) = tx.send(ControlChanMsg::MkDirSuccess { path: path_str }).await {
slog::warn!(logger, "{}", err);
}
});
Expand Down
9 changes: 6 additions & 3 deletions src/server/controlchan/commands/pass.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
SessionState::WaitPass => {
let pass: &str = std::str::from_utf8(self.password.as_ref())?;
let pass: String = pass.to_string();
let user: String = match session.username.clone() {
let username: String = match session.username.clone() {
Some(v) => v,
None => {
slog::error!(logger, "NoneError for username. This shouldn't happen.");
Expand All @@ -74,13 +74,16 @@ where
certificate_chain: session.cert_chain.clone(),
};
tokio::spawn(async move {
let msg = match auther.authenticate(&user, &creds).await {
let msg = match auther.authenticate(&username, &creds).await {
Ok(user) => {
if user.account_enabled() {
let mut session = session2clone.lock().await;
slog::info!(logger, "User {} logged in", user);
session.user = Arc::new(Some(user));
ControlChanMsg::AuthSuccess
ControlChanMsg::AuthSuccess {
username,
trace_id: session.trace_id,
}
} else {
slog::warn!(logger, "User {} authenticated but account is disabled", user);
ControlChanMsg::AuthFailed
Expand Down
2 changes: 1 addition & 1 deletion src/server/controlchan/commands/quit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
let tx: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let logger = args.logger;
// Let the control loop know it can exit.
if let Err(send_res) = tx.send(ControlChanMsg::Quit).await {
if let Err(send_res) = tx.send(ControlChanMsg::ExitControlLoop).await {
slog::warn!(logger, "could not send internal message: QUIT. {}", send_res);
}
Ok(Reply::new(ReplyCode::ClosingControlConnection, "Bye!"))
Expand Down
8 changes: 4 additions & 4 deletions src/server/controlchan/commands/rmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ where
let session = args.session.lock().await;
let storage: Arc<Storage> = Arc::clone(&session.storage);
let path = session.cwd.join(self.path.clone());
let tx_success = args.tx_control_chan.clone();
let tx_fail = args.tx_control_chan.clone();
let path_str = path.to_string_lossy().to_string();
let tx = args.tx_control_chan.clone();
let logger = args.logger;
if let Err(err) = storage.rmd((*session.user).as_ref().unwrap(), path).await {
slog::warn!(logger, "Failed to delete directory: {}", err);
let r = tx_fail.send(ControlChanMsg::StorageError(err)).await;
let r = tx.send(ControlChanMsg::StorageError(err)).await;
if let Err(e) = r {
slog::warn!(logger, "Could not send internal message to notify of RMD error: {}", e);
}
} else {
let r = tx_success.send(ControlChanMsg::DelSuccess).await;
let r = tx.send(ControlChanMsg::RmDirSuccess { path: path_str }).await;
if let Err(e) = r {
slog::warn!(logger, "Could not send internal message to notify of RMD success: {}", e);
}
Expand Down
Loading