Skip to content

Commit

Permalink
Add API to allow listening for events
Browse files Browse the repository at this point in the history
  • Loading branch information
hannesdejager committed Dec 24, 2021
1 parent d1d39b3 commit 63b8ed0
Show file tree
Hide file tree
Showing 21 changed files with 361 additions and 50 deletions.
2 changes: 0 additions & 2 deletions crates/unftp-sbe-fs/tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use async_ftp::{types::Result, FtpStream};
use libunftp::options::FtpsRequired;
use pretty_assertions::assert_eq;
use std::fmt::Debug;
use std::fs;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::{str, time::Duration};
use unftp_sbe_fs::ServerExt;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
//! ```
pub mod auth;
pub(crate) mod metrics;
pub mod notification;
pub(crate) mod server;
pub mod storage;

Expand Down
4 changes: 2 additions & 2 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ fn add_event_metric(event: &Event) {
add_command_metric(cmd);
}
Event::InternalMsg(msg) => match msg {
ControlChanMsg::SendData { bytes } => {
ControlChanMsg::SentData { bytes, .. } => {
FTP_BACKEND_READ_BYTES.inc_by(*bytes);
FTP_BACKEND_READ_FILES.inc();
}
ControlChanMsg::WrittenData { bytes } => {
ControlChanMsg::WrittenData { bytes, .. } => {
FTP_BACKEND_WRITE_BYTES.inc_by(*bytes);
FTP_BACKEND_WRITE_FILES.inc();
}
Expand Down
88 changes: 88 additions & 0 deletions src/notification/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use async_trait::async_trait;
use std::fmt::Debug;

/// An event pertaining to a client's login and logout actions in order to allow detection of the
/// presence of a client. Instances of these will be passed to an [`PresenceListener`](crate::notification::PresenceListener).
/// To identify the corresponding user or session see the [`EventMeta`](crate::notification::EventMeta) struct.
pub enum PresenceEvent {
/// The user logged in successfully
LoggedIn,
/// The user logged out
LoggedOut,
}

/// An event signalling a change in data on the storage back-end. To identify the corresponding user
/// or session see the [`EventMeta`](crate::notification::EventMeta) struct.
#[derive(Debug, Clone)]
pub enum DataEvent {
/// A RETR command finished successfully
Got {
/// The path to the file that was obtained
path: String,

/// The amount of bytes transferred to the client
bytes: u64,
},
/// A STOR command finished successfully
Put {
/// The path to the file that was obtained
path: String,

/// The amount of bytes stored
bytes: u64,
},
/// A DEL command finished successfully
Deleted {
/// The path to the file that was deleted.
path: String,
},
/// A MKD command finished successfully
MadeDir {
/// The path to the directory that was created
path: String,
},
/// A RMD command finished successfully
RemovedDir {
/// The path to the directory that was removed
path: String,
},
/// A RNFR & RNTO command sequence finished successfully. This can be for a file or a directory.
Renamed {
/// The original path
from: String,
/// The new path
to: String,
},
}

/// Metadata relating to an event that can be used to to identify the user and session. A sequence
/// number is also included to allow ordering in systems where event ordering is not guaranteed.
#[derive(Debug, Clone)]
pub struct EventMeta {
/// The user this event pertains to. A user may have more than one connection or session.
pub username: String,
/// Identifies a single session pertaining to a connected client.
pub trace_id: String,
/// The event sequence number as incremented per session.
pub sequence_number: u64,
}

/// An listener for [`DataEvent`](crate::notification::DataEvent)s. Implementations can
/// be passed to [`Server::notify_data`](crate::Server::notify_data)
/// in order to receive notifications.
#[async_trait]
pub trait DataListener: Sync + Send + Debug {
/// Called after the event happened. Event metadata is also passed to allow pinpointing the user
/// session for which it happened.
async fn receive_data_event(&self, e: DataEvent, m: EventMeta);
}

/// An listener for [`PresenceEvent`](crate::notification::PresenceEvent)s. Implementations can
/// be passed to [`Server::notify_presence`](crate::Server::notify_presence)
/// in order to receive notifications.
#[async_trait]
pub trait PresenceListener: Sync + Send + Debug {
/// Called after the event happened. Event metadata is also passed to allow pinpointing the user
/// session for which it happened.
async fn receive_presence_event(&self, e: PresenceEvent, m: EventMeta);
}
17 changes: 17 additions & 0 deletions src/notification/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#![deny(missing_docs)]
//!
//! Allows users to listen to events emitted by libunftp.
//!
//! To listen for changes in data implement the [`DataListener`](crate::notification::DataListener)
//! trait and use the [`Server::notify_data`](crate::Server::notify_data) method
//! to make libunftp notify it.
//!
//! To listen to logins and logouts implement the [`PresenceListener`](crate::notification::PresenceListener)
//! trait and use the [`Server::notify_presence`](crate::Server::notify_data) method
//! to make libunftp use it.
//!

pub(crate) mod event;
pub(crate) mod nop;

pub use event::{DataEvent, DataListener, EventMeta, PresenceEvent, PresenceListener};
17 changes: 17 additions & 0 deletions src/notification/nop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::notification::event::{DataEvent, DataListener, EventMeta, PresenceEvent, PresenceListener};

use async_trait::async_trait;

// An event listener that does nothing. Used as a default Null Object in [`Server`](crate::Server).
#[derive(Debug)]
pub struct NopListener {}

#[async_trait]
impl DataListener for NopListener {
async fn receive_data_event(&self, _: DataEvent, _: EventMeta) {}
}

#[async_trait]
impl PresenceListener for NopListener {
async fn receive_presence_event(&self, _: PresenceEvent, _: EventMeta) {}
}
34 changes: 27 additions & 7 deletions src/server/chancomms.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains code pertaining to the communication between the data and control channels.

use super::session::SharedSession;
use crate::server::session::TraceId;
use crate::{
auth::UserDetail,
server::controlchan::Reply,
Expand Down Expand Up @@ -45,13 +46,17 @@ pub enum ControlChanMsg {
PermissionDenied,
/// File not found
NotFound,
/// Send the data to the client
SendData {
/// Data was successfully sent to the client during a GET
SentData {
/// The path as specified by the client
path: String,
/// The number of bytes transferred
bytes: u64,
},
/// We've written the data from the client to the StorageBackend
WrittenData {
/// The path as specified by the client
path: String,
/// The number of bytes transferred
bytes: u64,
},
Expand All @@ -70,24 +75,39 @@ pub enum ControlChanMsg {
/// Successfully cwd
CwdSuccess,
/// File successfully deleted
DelSuccess,
DelFileSuccess {
/// The path as specified by the client
path: String,
},
/// File successfully deleted
RmDirSuccess {
/// The path as specified by the client
path: String,
},
/// File successfully deleted
RenameSuccess {
/// The old path as specified by the client
old_path: String,
/// The new path as specified by the client
new_path: String,
},
/// Failed to delete file
DelFail,
/// Quit the client connection
Quit,
ExitControlLoop,
/// Successfully created directory
MkdirSuccess(std::path::PathBuf),
MkDirSuccess { path: String },
/// Failed to crate directory
MkdirFail,
/// Authentication successful
AuthSuccess,
AuthSuccess { username: String, trace_id: TraceId },
/// Authentication failed
AuthFailed,
/// Sent to switch the control channel to TLS/SSL mode.
SecureControlChannel,
/// Sent to switch the control channel from TLS/SSL mode back to plaintext.
PlaintextControlChannel,
/// Errors comming from the storage
/// Errors coming from the storage backend
StorageError(Error),
/// Reply on the command channel
CommandChannelReply(Reply),
Expand Down
3 changes: 2 additions & 1 deletion src/server/controlchan/commands/dele.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ where
let storage = Arc::clone(&session.storage);
let user = session.user.clone();
let path = session.cwd.join(self.path.clone());
let path_str = path.to_string_lossy().to_string();
let tx_success: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let tx_fail: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let logger = args.logger;
tokio::spawn(async move {
match storage.del((*user).as_ref().unwrap(), path).await {
Ok(_) => {
if let Err(err) = tx_success.send(ControlChanMsg::DelSuccess).await {
if let Err(err) = tx_success.send(ControlChanMsg::DelFileSuccess { path: path_str }).await {
slog::warn!(logger, "{}", err);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/server/controlchan/commands/mkd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ where
let user = session.user.clone();
let storage = Arc::clone(&session.storage);
let path: PathBuf = session.cwd.join(self.path.clone());
let tx_success: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let tx_fail: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let path_str = path.to_string_lossy().to_string();
let tx: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let logger = args.logger;
tokio::spawn(async move {
if let Err(err) = storage.mkd((*user).as_ref().unwrap(), &path).await {
if let Err(err) = tx_fail.send(ControlChanMsg::StorageError(err)).await {
if let Err(err) = tx.send(ControlChanMsg::StorageError(err)).await {
slog::warn!(logger, "{}", err);
}
} else if let Err(err) = tx_success.send(ControlChanMsg::MkdirSuccess(path)).await {
} else if let Err(err) = tx.send(ControlChanMsg::MkDirSuccess { path: path_str }).await {
slog::warn!(logger, "{}", err);
}
});
Expand Down
9 changes: 6 additions & 3 deletions src/server/controlchan/commands/pass.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
SessionState::WaitPass => {
let pass: &str = std::str::from_utf8(self.password.as_ref())?;
let pass: String = pass.to_string();
let user: String = match session.username.clone() {
let username: String = match session.username.clone() {
Some(v) => v,
None => {
slog::error!(logger, "NoneError for username. This shouldn't happen.");
Expand All @@ -74,13 +74,16 @@ where
certificate_chain: session.cert_chain.clone(),
};
tokio::spawn(async move {
let msg = match auther.authenticate(&user, &creds).await {
let msg = match auther.authenticate(&username, &creds).await {
Ok(user) => {
if user.account_enabled() {
let mut session = session2clone.lock().await;
slog::info!(logger, "User {} logged in", user);
session.user = Arc::new(Some(user));
ControlChanMsg::AuthSuccess
ControlChanMsg::AuthSuccess {
username,
trace_id: session.trace_id,
}
} else {
slog::warn!(logger, "User {} authenticated but account is disabled", user);
ControlChanMsg::AuthFailed
Expand Down
2 changes: 1 addition & 1 deletion src/server/controlchan/commands/quit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
let tx: Sender<ControlChanMsg> = args.tx_control_chan.clone();
let logger = args.logger;
// Let the control loop know it can exit.
if let Err(send_res) = tx.send(ControlChanMsg::Quit).await {
if let Err(send_res) = tx.send(ControlChanMsg::ExitControlLoop).await {
slog::warn!(logger, "could not send internal message: QUIT. {}", send_res);
}
Ok(Reply::new(ReplyCode::ClosingControlConnection, "Bye!"))
Expand Down
8 changes: 4 additions & 4 deletions src/server/controlchan/commands/rmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ where
let session = args.session.lock().await;
let storage: Arc<Storage> = Arc::clone(&session.storage);
let path = session.cwd.join(self.path.clone());
let tx_success = args.tx_control_chan.clone();
let tx_fail = args.tx_control_chan.clone();
let path_str = path.to_string_lossy().to_string();
let tx = args.tx_control_chan.clone();
let logger = args.logger;
if let Err(err) = storage.rmd((*session.user).as_ref().unwrap(), path).await {
slog::warn!(logger, "Failed to delete directory: {}", err);
let r = tx_fail.send(ControlChanMsg::StorageError(err)).await;
let r = tx.send(ControlChanMsg::StorageError(err)).await;
if let Err(e) = r {
slog::warn!(logger, "Could not send internal message to notify of RMD error: {}", e);
}
} else {
let r = tx_success.send(ControlChanMsg::DelSuccess).await;
let r = tx.send(ControlChanMsg::RmDirSuccess { path: path_str }).await;
if let Err(e) = r {
slog::warn!(logger, "Could not send internal message to notify of RMD success: {}", e);
}
Expand Down
40 changes: 28 additions & 12 deletions src/server/controlchan/commands/rnto.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! The RFC 959 Rename To (`RNTO`) command

use crate::server::ControlChanMsg;
use crate::storage::{Metadata, StorageBackend};
use crate::{
auth::UserDetail,
Expand Down Expand Up @@ -32,22 +33,37 @@ where
{
#[tracing_attributes::instrument]
async fn handle(&self, args: CommandContext<Storage, User>) -> Result<Reply, ControlChanError> {
let mut session = args.session.lock().await;
let CommandContext {
logger,
session,
tx_control_chan,
..
} = args;
let mut session = session.lock().await;
let storage = Arc::clone(&session.storage);
let logger = args.logger;
let reply = match session.rename_from.take() {

let (from, to) = match session.rename_from.take() {
Some(from) => {
let to = session.cwd.join(self.path.clone());
match storage.rename((*session.user).as_ref().unwrap(), from, to).await {
Ok(_) => Reply::new(ReplyCode::FileActionOkay, "Renamed"),
Err(err) => {
slog::warn!(logger, "Error renaming: {:?}", err);
Reply::new(ReplyCode::FileError, "Storage error while renaming")
}
}
(from, to)
}
None => Reply::new(ReplyCode::TransientFileError, "Please tell me what file you want to rename first"),
None => return Ok(Reply::new(ReplyCode::TransientFileError, "Please tell me what file you want to rename first")),
};
Ok(reply)
let user = (*session.user).as_ref().unwrap();
let old_path = from.to_string_lossy().to_string();
let new_path = to.to_string_lossy().to_string();
match storage.rename(user, from, to).await {
Ok(_) => {
if let Err(err) = tx_control_chan.send(ControlChanMsg::RenameSuccess { old_path, new_path }).await {
slog::warn!(logger, "{}", err);
}
}
Err(err) => {
if let Err(err) = tx_control_chan.send(ControlChanMsg::StorageError(err)).await {
slog::warn!(logger, "{}", err);
}
}
}
Ok(Reply::none())
}
}
Loading

0 comments on commit 63b8ed0

Please sign in to comment.