From 22a7d8743cf487ed6b7d64ee0f1d02469da32231 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Fri, 28 Jun 2024 14:32:49 +0530 Subject: [PATCH] enhancement in saved filters API (#833) - filter id generated by server - s3 calls optimized - added method for update filters --- server/src/handlers/http/modal/server.rs | 48 ++++---- server/src/handlers/http/users/filters.rs | 130 ++++++++-------------- server/src/storage/localfs.rs | 31 ++++++ server/src/storage/object_storage.rs | 1 + server/src/storage/s3.rs | 44 ++++++++ server/src/users/filters.rs | 86 +++++++------- 6 files changed, 196 insertions(+), 144 deletions(-) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index bcb56b6ab..f5492d2ad 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -190,29 +190,33 @@ impl Server { // get the filters web scope pub fn get_filters_webscope() -> Scope { - web::scope("/filters").service( - web::scope("/{user_id}") - .service( - web::resource("") - .route(web::get().to(filters::list).authorize(Action::ListFilter)), - ) - .service( - web::scope("/{filter_id}").service( - web::resource("") - .route(web::get().to(filters::get).authorize(Action::GetFilter)) - .route( - web::post() - .to(filters::post) - .authorize(Action::CreateFilter), - ) - .route( - web::delete() - .to(filters::delete) - .authorize(Action::DeleteFilter), - ), - ), + web::scope("/filters") + .service( + web::resource("").route( + web::post() + .to(filters::post) + .authorize(Action::CreateFilter), ), - ) + ) + .service( + web::scope("/filter").service( + web::resource("/{filter_id}") + .route(web::get().to(filters::get).authorize(Action::GetFilter)) + .route( + web::delete() + .to(filters::delete) + .authorize(Action::DeleteFilter), + ) + .route( + web::put() + .to(filters::update) + .authorize(Action::CreateFilter), + ), + ), + ) + .service(web::scope("/{user_id}").service( + web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)), + )) } // get the query factory diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs index 49179667e..dca63949b 100644 --- a/server/src/handlers/http/users/filters.rs +++ b/server/src/handlers/http/users/filters.rs @@ -20,19 +20,19 @@ use crate::{ handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY}, option::CONFIG, storage::{object_storage::filter_path, ObjectStorageError}, - users::filters::{Filter, FILTERS}, + users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS}, }; use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; use http::StatusCode; -use serde_json::{Error as SerdeError, Value as JsonValue}; +use rand::distributions::DistString; +use serde_json::Error as SerdeError; pub async fn list(req: HttpRequest) -> Result { let user_id = req .match_info() .get("user_id") .ok_or(FiltersError::Metadata("No User Id Provided"))?; - let stream_name = req .headers() .iter() @@ -41,117 +41,85 @@ pub async fn list(req: HttpRequest) -> Result { .1 .to_str() .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + let filters = FILTERS.list_filters_by_user_and_stream(user_id, stream_name); - // .users/user_id/filters/stream_name/filter_id - let path = filter_path(user_id, stream_name, ""); - - let store = CONFIG.storage().get_object_store(); - let filters = store - .get_objects( - Some(&path), - Box::new(|file_name: String| file_name.ends_with("json")), - ) - .await?; - - let mut filt = vec![]; - for filter in filters { - filt.push(serde_json::from_slice::(&filter)?) - } - - Ok((web::Json(filt), StatusCode::OK)) + Ok((web::Json(filters), StatusCode::OK)) } pub async fn get(req: HttpRequest) -> Result { - let user_id = req - .match_info() - .get("user_id") - .ok_or(FiltersError::Metadata("No User Id Provided"))?; - - let filt_id = req + let filter_id = req .match_info() .get("filter_id") .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; - let stream_name = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? - .1 - .to_str() - .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; - - if let Some(filter) = FILTERS.find(filt_id) { + if let Some(filter) = FILTERS.get_filter(filter_id) { return Ok((web::Json(filter), StatusCode::OK)); } - // if it is not in memory go to s3 - let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); - let resource = CONFIG - .storage() - .get_object_store() - .get_object(&path) - .await?; + Err(FiltersError::Metadata("Filter Not Found")) +} - let resource = serde_json::from_slice::(&resource)?; +pub async fn post(body: Bytes) -> Result { + let filter: Filter = serde_json::from_slice(&body)?; + let filter_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 10); + let user_id = &filter.user_id; + let stream_name = &filter.stream_name; + let mut cloned_filter = filter.clone(); + cloned_filter.filter_id = Some(filter_id.clone()); + cloned_filter.version = Some(CURRENT_FILTER_VERSION.to_string()); + FILTERS.update(&cloned_filter); - Ok((web::Json(resource), StatusCode::OK)) -} + let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id)); -pub async fn post(req: HttpRequest, body: Bytes) -> Result { - let user_id = req - .match_info() - .get("user_id") - .ok_or(FiltersError::Metadata("No User Id Provided"))?; + let store = CONFIG.storage().get_object_store(); + let filter_bytes = serde_json::to_vec(&cloned_filter)?; + store.put_object(&path, Bytes::from(filter_bytes)).await?; - let filt_id = req + Ok(HttpResponse::Ok().finish()) +} + +pub async fn update(req: HttpRequest, body: Bytes) -> Result { + let filter_id = req .match_info() .get("filter_id") .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + let filter = FILTERS + .get_filter(filter_id) + .ok_or(FiltersError::Metadata("Filter Not Found"))?; + let user_id = &filter.user_id; + let stream_name = &filter.stream_name; - let stream_name = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? - .1 - .to_str() - .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + let mut cloned_filter: Filter = serde_json::from_slice(&body)?; + cloned_filter.filter_id = Some(filter_id.to_string()); + cloned_filter.version = Some(CURRENT_FILTER_VERSION.to_string()); + FILTERS.update(&cloned_filter); - let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); - let filter: Filter = serde_json::from_slice(&body)?; - FILTERS.update(filter); + let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id)); let store = CONFIG.storage().get_object_store(); - store.put_object(&path, body).await?; + let filter_bytes = serde_json::to_vec(&cloned_filter)?; + store.put_object(&path, Bytes::from(filter_bytes)).await?; Ok(HttpResponse::Ok().finish()) } pub async fn delete(req: HttpRequest) -> Result { - let user_id = req - .match_info() - .get("user_id") - .ok_or(FiltersError::Metadata("No User Id Provided"))?; - - let filt_id = req + let filter_id = req .match_info() .get("filter_id") .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + let filter = FILTERS + .get_filter(filter_id) + .ok_or(FiltersError::Metadata("Filter Not Found"))?; + let stream_name = &filter.stream_name; + let user_id = &filter.user_id; - let stream_name = req - .headers() - .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) - .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? - .1 - .to_str() - .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; - - let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id)); let store = CONFIG.storage().get_object_store(); store.delete_object(&path).await?; + FILTERS.delete_filter(filter_id); + Ok(HttpResponse::Ok().finish()) } @@ -161,7 +129,7 @@ pub enum FiltersError { ObjectStorage(#[from] ObjectStorageError), #[error("Serde Error: {0}")] Serde(#[from] SerdeError), - #[error("Cannot perform this operation: {0}")] + #[error("Operation cannot be performed: {0}")] Metadata(&'static str), } diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 9fb72946e..5f396bf3a 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -350,6 +350,37 @@ impl ObjectStorage for LocalFS { Ok(dirs) } + async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { + let mut filters = vec![]; + let users_root_path = self.root.join(USERS_ROOT_DIR); + let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); + let users: Vec = directories.try_collect().await?; + for user in users { + if !user.path().is_dir() { + continue; + } + let stream_root_path = users_root_path.join(user.path()).join("filters"); + let directories = ReadDirStream::new(fs::read_dir(&stream_root_path).await?); + let streams: Vec = directories.try_collect().await?; + for stream in streams { + if !stream.path().is_dir() { + continue; + } + let filters_path = users_root_path + .join(user.path()) + .join("filters") + .join(stream.path()); + let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?); + let filters_files: Vec = directories.try_collect().await?; + for filter in filters_files { + let file = fs::read(filter.path()).await?; + filters.push(file.into()); + } + } + } + Ok(filters) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let path = self.root.join(stream_name); let directories = ReadDirStream::new(fs::read_dir(&path).await?); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7c0dcda7a..1f2a96156 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -84,6 +84,7 @@ pub trait ObjectStorage: Sync + 'static { async fn list_streams(&self) -> Result, ObjectStorageError>; async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; + async fn get_all_saved_filters(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 00ac3c01c..d7729eb92 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -640,6 +640,50 @@ impl ObjectStorage for S3 { .collect::>()) } + async fn get_all_saved_filters(&self) -> Result, ObjectStorageError> { + let mut filters = vec![]; + let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); + let resp = self + .client + .list_with_delimiter(Some(&users_root_path)) + .await?; + + let users = resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for user in users { + let user_filters_path = object_store::path::Path::from(format!( + "{}/{}/{}", + USERS_ROOT_DIR, user, "filters" + )); + let resp = self + .client + .list_with_delimiter(Some(&user_filters_path)) + .await?; + let streams = resp + .common_prefixes + .iter() + .filter(|name| name.as_ref() != USERS_ROOT_DIR) + .map(|name| name.as_ref().to_string()) + .collect::>(); + for stream in streams { + let filters_path = RelativePathBuf::from(&stream); + let filter_bytes = self + .get_objects( + Some(&filters_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + filters.extend(filter_bytes); + } + } + Ok(filters) + } + fn get_bucket_name(&self) -> String { self.bucket.clone() } diff --git a/server/src/users/filters.rs b/server/src/users/filters.rs index 6b2577fd0..58d51a76a 100644 --- a/server/src/users/filters.rs +++ b/server/src/users/filters.rs @@ -17,58 +17,52 @@ */ use once_cell::sync::Lazy; -use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; use std::sync::RwLock; use super::TimeFilter; -use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG}; +use crate::{metadata::LOCK_EXPECT, option::CONFIG}; pub static FILTERS: Lazy = Lazy::new(Filters::default); - +pub const CURRENT_FILTER_VERSION: &str = "v1"; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct Filter { - version: String, - stream_name: String, - filter_name: String, - filter_id: String, - query: FilterQuery, - time_filter: Option, + pub version: Option, + pub user_id: String, + pub stream_name: String, + pub filter_name: String, + pub filter_id: Option, + pub query: FilterQuery, + pub time_filter: Option, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FilterQuery { - filter_type: String, - filter_query: Option, - filter_builder: Option, + pub filter_type: String, + pub filter_query: Option, + pub filter_builder: Option, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FilterBuilder { - id: String, - combinator: String, - rules: Vec, + pub id: String, + pub combinator: String, + pub rules: Vec, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct FilterRules { - id: String, - combinator: String, - rules: Vec, + pub id: String, + pub combinator: String, + pub rules: Vec, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct Rules { - id: String, - field: String, - value: String, - operator: String, -} - -impl Filter { - pub fn filter_id(&self) -> &str { - &self.filter_id - } + pub id: String, + pub field: String, + pub value: String, + pub operator: String, } #[derive(Debug, Default)] @@ -77,16 +71,11 @@ pub struct Filters(RwLock>); impl Filters { pub async fn load(&self) -> anyhow::Result<()> { let mut this = vec![]; - let path = RelativePathBuf::from(USERS_ROOT_DIR); let store = CONFIG.storage().get_object_store(); + let filters = store.get_all_saved_filters().await.unwrap_or_default(); - let objs = store - .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) - .await - .unwrap_or_default(); - - for obj in objs { - if let Ok(filter) = serde_json::from_slice::(&obj) { + for filter in filters { + if let Ok(filter) = serde_json::from_slice::(&filter) { this.push(filter); } } @@ -97,18 +86,33 @@ impl Filters { Ok(()) } - pub fn update(&self, filter: Filter) { + pub fn update(&self, filter: &Filter) { + let mut s = self.0.write().expect(LOCK_EXPECT); + s.retain(|f| f.filter_id != filter.filter_id); + s.push(filter.clone()); + } + + pub fn delete_filter(&self, filter_id: &str) { let mut s = self.0.write().expect(LOCK_EXPECT); - s.retain(|f| f.filter_id() != filter.filter_id()); - s.push(filter); + s.retain(|f| f.filter_id != Some(filter_id.to_string())); + } + + pub fn get_filter(&self, filter_id: &str) -> Option { + self.0 + .read() + .expect(LOCK_EXPECT) + .iter() + .find(|f| f.filter_id == Some(filter_id.to_string())) + .cloned() } - pub fn find(&self, filter_id: &str) -> Option { + pub fn list_filters_by_user_and_stream(&self, user_id: &str, stream_name: &str) -> Vec { self.0 .read() .expect(LOCK_EXPECT) .iter() - .find(|filter| filter.filter_id() == filter_id) + .filter(|f| f.user_id == user_id && f.stream_name == stream_name) .cloned() + .collect() } }