Skip to content

Commit

Permalink
enhancement in saved filters API (#833)
Browse files Browse the repository at this point in the history
- filter id  generated by server
- s3 calls optimized
- added method for update filters
  • Loading branch information
nikhilsinhaparseable authored Jun 28, 2024
1 parent c2164eb commit 22a7d87
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 144 deletions.
48 changes: 26 additions & 22 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,29 +190,33 @@ impl Server {

// get the filters web scope
pub fn get_filters_webscope() -> Scope {
web::scope("/filters").service(
web::scope("/{user_id}")
.service(
web::resource("")
.route(web::get().to(filters::list).authorize(Action::ListFilter)),
)
.service(
web::scope("/{filter_id}").service(
web::resource("")
.route(web::get().to(filters::get).authorize(Action::GetFilter))
.route(
web::post()
.to(filters::post)
.authorize(Action::CreateFilter),
)
.route(
web::delete()
.to(filters::delete)
.authorize(Action::DeleteFilter),
),
),
web::scope("/filters")
.service(
web::resource("").route(
web::post()
.to(filters::post)
.authorize(Action::CreateFilter),
),
)
)
.service(
web::scope("/filter").service(
web::resource("/{filter_id}")
.route(web::get().to(filters::get).authorize(Action::GetFilter))
.route(
web::delete()
.to(filters::delete)
.authorize(Action::DeleteFilter),
)
.route(
web::put()
.to(filters::update)
.authorize(Action::CreateFilter),
),
),
)
.service(web::scope("/{user_id}").service(
web::resource("").route(web::get().to(filters::list).authorize(Action::ListFilter)),
))
}

// get the query factory
Expand Down
130 changes: 49 additions & 81 deletions server/src/handlers/http/users/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ use crate::{
handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY},
option::CONFIG,
storage::{object_storage::filter_path, ObjectStorageError},
users::filters::{Filter, FILTERS},
users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS},
};
use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use http::StatusCode;
use serde_json::{Error as SerdeError, Value as JsonValue};
use rand::distributions::DistString;
use serde_json::Error as SerdeError;

pub async fn list(req: HttpRequest) -> Result<impl Responder, FiltersError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;

let stream_name = req
.headers()
.iter()
Expand All @@ -41,117 +41,85 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, FiltersError> {
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;
let filters = FILTERS.list_filters_by_user_and_stream(user_id, stream_name);

// .users/user_id/filters/stream_name/filter_id
let path = filter_path(user_id, stream_name, "");

let store = CONFIG.storage().get_object_store();
let filters = store
.get_objects(
Some(&path),
Box::new(|file_name: String| file_name.ends_with("json")),
)
.await?;

let mut filt = vec![];
for filter in filters {
filt.push(serde_json::from_slice::<JsonValue>(&filter)?)
}

Ok((web::Json(filt), StatusCode::OK))
Ok((web::Json(filters), StatusCode::OK))
}

pub async fn get(req: HttpRequest) -> Result<impl Responder, FiltersError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;

let filt_id = req
let filter_id = req
.match_info()
.get("filter_id")
.ok_or(FiltersError::Metadata("No Filter Id Provided"))?;

let stream_name = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))?
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;

if let Some(filter) = FILTERS.find(filt_id) {
if let Some(filter) = FILTERS.get_filter(filter_id) {
return Ok((web::Json(filter), StatusCode::OK));
}

// if it is not in memory go to s3
let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
let resource = CONFIG
.storage()
.get_object_store()
.get_object(&path)
.await?;
Err(FiltersError::Metadata("Filter Not Found"))
}

let resource = serde_json::from_slice::<Filter>(&resource)?;
pub async fn post(body: Bytes) -> Result<HttpResponse, PostError> {
let filter: Filter = serde_json::from_slice(&body)?;
let filter_id = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 10);
let user_id = &filter.user_id;
let stream_name = &filter.stream_name;
let mut cloned_filter = filter.clone();
cloned_filter.filter_id = Some(filter_id.clone());
cloned_filter.version = Some(CURRENT_FILTER_VERSION.to_string());
FILTERS.update(&cloned_filter);

Ok((web::Json(resource), StatusCode::OK))
}
let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id));

pub async fn post(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;
let store = CONFIG.storage().get_object_store();
let filter_bytes = serde_json::to_vec(&cloned_filter)?;
store.put_object(&path, Bytes::from(filter_bytes)).await?;

let filt_id = req
Ok(HttpResponse::Ok().finish())
}

pub async fn update(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
let filter_id = req
.match_info()
.get("filter_id")
.ok_or(FiltersError::Metadata("No Filter Id Provided"))?;
let filter = FILTERS
.get_filter(filter_id)
.ok_or(FiltersError::Metadata("Filter Not Found"))?;
let user_id = &filter.user_id;
let stream_name = &filter.stream_name;

let stream_name = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))?
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;
let mut cloned_filter: Filter = serde_json::from_slice(&body)?;
cloned_filter.filter_id = Some(filter_id.to_string());
cloned_filter.version = Some(CURRENT_FILTER_VERSION.to_string());
FILTERS.update(&cloned_filter);

let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
let filter: Filter = serde_json::from_slice(&body)?;
FILTERS.update(filter);
let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id));

let store = CONFIG.storage().get_object_store();
store.put_object(&path, body).await?;
let filter_bytes = serde_json::to_vec(&cloned_filter)?;
store.put_object(&path, Bytes::from(filter_bytes)).await?;

Ok(HttpResponse::Ok().finish())
}

pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
let user_id = req
.match_info()
.get("user_id")
.ok_or(FiltersError::Metadata("No User Id Provided"))?;

let filt_id = req
let filter_id = req
.match_info()
.get("filter_id")
.ok_or(FiltersError::Metadata("No Filter Id Provided"))?;
let filter = FILTERS
.get_filter(filter_id)
.ok_or(FiltersError::Metadata("Filter Not Found"))?;
let stream_name = &filter.stream_name;
let user_id = &filter.user_id;

let stream_name = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))?
.1
.to_str()
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;

let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
let path = filter_path(user_id, stream_name, &format!("{}.json", filter_id));
let store = CONFIG.storage().get_object_store();
store.delete_object(&path).await?;

FILTERS.delete_filter(filter_id);

Ok(HttpResponse::Ok().finish())
}

Expand All @@ -161,7 +129,7 @@ pub enum FiltersError {
ObjectStorage(#[from] ObjectStorageError),
#[error("Serde Error: {0}")]
Serde(#[from] SerdeError),
#[error("Cannot perform this operation: {0}")]
#[error("Operation cannot be performed: {0}")]
Metadata(&'static str),
}

Expand Down
31 changes: 31 additions & 0 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,37 @@ impl ObjectStorage for LocalFS {
Ok(dirs)
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
let users_root_path = self.root.join(USERS_ROOT_DIR);
let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
let users: Vec<DirEntry> = directories.try_collect().await?;
for user in users {
if !user.path().is_dir() {
continue;
}
let stream_root_path = users_root_path.join(user.path()).join("filters");
let directories = ReadDirStream::new(fs::read_dir(&stream_root_path).await?);
let streams: Vec<DirEntry> = directories.try_collect().await?;
for stream in streams {
if !stream.path().is_dir() {
continue;
}
let filters_path = users_root_path
.join(user.path())
.join("filters")
.join(stream.path());
let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?);
let filters_files: Vec<DirEntry> = directories.try_collect().await?;
for filter in filters_files {
let file = fs::read(filter.path()).await?;
filters.push(file.into());
}
}
}
Ok(filters)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name);
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
Expand Down
1 change: 1 addition & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_old_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
async fn delete_object(&self, path: &RelativePath) -> Result<(), ObjectStorageError>;
Expand Down
44 changes: 44 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,50 @@ impl ObjectStorage for S3 {
.collect::<Vec<_>>())
}

async fn get_all_saved_filters(&self) -> Result<Vec<Bytes>, ObjectStorageError> {
let mut filters = vec![];
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
let resp = self
.client
.list_with_delimiter(Some(&users_root_path))
.await?;

let users = resp
.common_prefixes
.iter()
.flat_map(|path| path.parts())
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>();
for user in users {
let user_filters_path = object_store::path::Path::from(format!(
"{}/{}/{}",
USERS_ROOT_DIR, user, "filters"
));
let resp = self
.client
.list_with_delimiter(Some(&user_filters_path))
.await?;
let streams = resp
.common_prefixes
.iter()
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>();
for stream in streams {
let filters_path = RelativePathBuf::from(&stream);
let filter_bytes = self
.get_objects(
Some(&filters_path),
Box::new(|file_name| file_name.ends_with(".json")),
)
.await?;
filters.extend(filter_bytes);
}
}
Ok(filters)
}

fn get_bucket_name(&self) -> String {
self.bucket.clone()
}
Expand Down
Loading

0 comments on commit 22a7d87

Please sign in to comment.