From afaf87f1ca510953e2ad69d391cd2bc8db1cf64f Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 13 May 2024 15:22:44 +0530 Subject: [PATCH 01/24] add the dashboards api --- server/src/handlers/http.rs | 1 + server/src/handlers/http/ingest.rs | 5 + .../src/handlers/http/modal/query_server.rs | 1 + server/src/handlers/http/modal/server.rs | 36 +++++ server/src/handlers/http/users/dashboards.rs | 140 ++++++++++++++++++ server/src/handlers/http/users/mod.rs | 4 + server/src/rbac/role.rs | 8 + server/src/storage/object_storage.rs | 9 +- 8 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 server/src/handlers/http/users/dashboards.rs create mode 100644 server/src/handlers/http/users/mod.rs diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 6044b74ac..200ba885e 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -39,6 +39,7 @@ mod otel; pub(crate) mod query; pub(crate) mod rbac; pub(crate) mod role; +pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index a5157f34c..7137a54d2 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -18,6 +18,7 @@ use super::cluster::INTERNAL_STREAM_NAME; use super::logstream::error::CreateStreamError; +use super::users::dashboards::DashboardError; use super::{kinesis, otel}; use crate::event::{ self, @@ -436,6 +437,9 @@ pub enum PostError { NetworkError(#[from] reqwest::Error), #[error("ObjectStorageError: {0}")] ObjectStorageError(#[from] ObjectStorageError), + + #[error("Error: {0}")] + DashboardError(#[from] DashboardError), } impl actix_web::ResponseError for PostError { @@ -453,6 +457,7 @@ impl actix_web::ResponseError for PostError { PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 0abfc7ff0..be66fe71e 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -124,6 +124,7 @@ impl QueryServer { .service(Server::get_about_factory()) .service(Server::get_logstream_webscope()) .service(Server::get_user_webscope()) + .service(Server::get_dashboards_webscope()) .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Server::get_user_role_webscope()) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 7b89dd9b9..f1b2afeb0 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -23,6 +23,7 @@ use crate::handlers::http::about; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::query; +use crate::handlers::http::users::dashboards; use crate::handlers::http::API_BASE_PATH; use crate::handlers::http::API_VERSION; use crate::localcache::LocalCacheManager; @@ -139,6 +140,7 @@ impl Server { .service(Self::get_about_factory()) .service(Self::get_logstream_webscope()) .service(Self::get_user_webscope()) + .service(Self::get_dashboards_webscope()) .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()), @@ -146,6 +148,40 @@ impl Server { .service(Self::get_generated()); } + // get the dashboards web scope + pub fn get_dashboards_webscope() -> Scope { + web::scope("/dashboards").service( + web::scope("/{user_id}") + .service( + web::resource("").route( + web::get() + .to(dashboards::list) + .authorize(Action::ListDashboard), + ), + ) + .service( + web::scope("/{dashboard_id}").service( + web::resource("") + .route( + web::get() + .to(dashboards::get) + .authorize(Action::GetDashboard), + ) + .route( + web::post() + .to(dashboards::post) + .authorize(Action::CreateDashboard), + ) + .route( + web::delete() + .to(dashboards::delete) + .authorize(Action::DeleteDashboard), + ), + ), + ), + ) + } + // get the query factory pub fn get_query_factory() -> Resource { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs new file mode 100644 index 000000000..2478b1dfa --- /dev/null +++ b/server/src/handlers/http/users/dashboards.rs @@ -0,0 +1,140 @@ +use crate::{ + handlers::http::ingest::PostError, + option::CONFIG, + storage::{object_storage::dashboard_path, ObjectStorageError}, +}; +use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; +use http::StatusCode; +use serde_json::{Error as SerdeError, Value as JsonValue}; + +pub async fn list(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + // .users/user_id/dashboards/ + let path = dashboard_path(user_id, ""); + + let store = CONFIG.storage().get_object_store(); + let dashboards = store + .get_objects( + Some(&path), + Box::new(|file_name: String| file_name.ends_with("json")), + ) + .await?; + + let mut dash = vec![]; + for dashboard in dashboards { + dash.push(serde_json::from_slice::(&dashboard)?) + } + + Ok((web::Json(dash), StatusCode::OK)) +} + +pub async fn get(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + let dash_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); + let resource = CONFIG + .storage() + .get_object_store() + .get_object(&dash_file_path) + .await?; + let resource = serde_json::from_slice::(&resource)?; + + Ok((web::Json(resource), StatusCode::OK)) +} + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + let dash_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); + + let store = CONFIG.storage().get_object_store(); + store.put_object(&dash_file_path, body).await?; + + Ok(HttpResponse::Ok().finish()) +} + +pub async fn delete(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + let dash_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); + + let store = CONFIG.storage().get_object_store(); + store.delete_object(&dash_file_path).await?; + + Ok(HttpResponse::Ok().finish()) +} + +// #[derive(Debug, Serialize, Deserialize)] +// pub struct Dashboard { +// version: String, +// name: String, +// id: String, +// time-filter: `type_not_defined` +// refresh_interval: u64, +// pannels: Vec, +// } +// +// #[derive(Debug, Serialize, Deserialize)] +// pub struct Pannel { +// stream_name: String, +// query: String, +// chart_type: String, +// columns: Vec, +// headers: Vec, +// dimensions: (u64, u64), +// } + +#[derive(Debug, thiserror::Error)] +pub enum DashboardError { + #[error("Failed to connect to storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), +} + +impl actix_web::ResponseError for DashboardError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/handlers/http/users/mod.rs b/server/src/handlers/http/users/mod.rs new file mode 100644 index 000000000..547da2fef --- /dev/null +++ b/server/src/handlers/http/users/mod.rs @@ -0,0 +1,4 @@ +pub mod dashboards; + +pub const USERS_ROOT_DIR: &str = ".users"; +pub const DASHBOARDS_DIR: &str = "dashboards"; diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index ee17bea5c..aced0c288 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -50,6 +50,10 @@ pub enum Action { Deleteingestor, All, GetAnalytics, + ListDashboard, + GetDashboard, + CreateDashboard, + DeleteDashboard, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -107,6 +111,10 @@ impl RoleBuilder { | Action::ListCluster | Action::ListClusterMetrics | Action::Deleteingestor + | Action::ListDashboard + | Action::GetDashboard + | Action::CreateDashboard + | Action::DeleteDashboard | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f5945b86d..f702d510e 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,6 +27,7 @@ use super::{ use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; +use crate::handlers::http::users::{DASHBOARDS_DIR, USERS_ROOT_DIR}; use crate::option::Mode; use crate::{ alerts::Alerts, @@ -66,7 +67,7 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { #[async_trait] pub trait ObjectStorage: Sync + 'static { async fn get_object(&self, path: &RelativePath) -> Result; - // want to make it more generic with a filter function + // TODO: make the filter function optional as we may want to get all objects async fn get_objects( &self, base_path: Option<&RelativePath>, @@ -603,6 +604,12 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { } } +/// if dashboard_id is an empty str it should not append it to the rel path +#[inline(always)] +pub fn dashboard_path(user_id: &str, dashboard_id: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([USERS_ROOT_DIR, user_id, DASHBOARDS_DIR, dashboard_id]) +} + /// path will be ".parseable/.parsable.json" #[inline(always)] pub fn parseable_json_path() -> RelativePathBuf { From 4b22348a256a0ec9a1f8b26ea964b98fbed0b1e9 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 13 May 2024 16:24:02 +0530 Subject: [PATCH 02/24] add: filters api --- server/src/handlers/http/ingest.rs | 5 +- .../src/handlers/http/modal/query_server.rs | 1 + server/src/handlers/http/modal/server.rs | 29 +++ server/src/handlers/http/users/filters.rs | 165 ++++++++++++++++++ server/src/handlers/http/users/mod.rs | 2 + server/src/rbac/role.rs | 8 + server/src/storage/object_storage.rs | 18 +- 7 files changed, 224 insertions(+), 4 deletions(-) create mode 100644 server/src/handlers/http/users/filters.rs diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7137a54d2..b442b6efc 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -19,6 +19,7 @@ use super::cluster::INTERNAL_STREAM_NAME; use super::logstream::error::CreateStreamError; use super::users::dashboards::DashboardError; +use super::users::filters::FiltersError; use super::{kinesis, otel}; use crate::event::{ self, @@ -437,7 +438,8 @@ pub enum PostError { NetworkError(#[from] reqwest::Error), #[error("ObjectStorageError: {0}")] ObjectStorageError(#[from] ObjectStorageError), - + #[error("Error: {0}")] + FiltersError(#[from] FiltersError), #[error("Error: {0}")] DashboardError(#[from] DashboardError), } @@ -458,6 +460,7 @@ impl actix_web::ResponseError for PostError { PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index be66fe71e..efa570d21 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -125,6 +125,7 @@ impl QueryServer { .service(Server::get_logstream_webscope()) .service(Server::get_user_webscope()) .service(Server::get_dashboards_webscope()) + .service(Server::get_filters_webscope()) .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Server::get_user_role_webscope()) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index f1b2afeb0..4d086c61e 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -24,6 +24,7 @@ use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::query; use crate::handlers::http::users::dashboards; +use crate::handlers::http::users::filters; use crate::handlers::http::API_BASE_PATH; use crate::handlers::http::API_VERSION; use crate::localcache::LocalCacheManager; @@ -141,6 +142,7 @@ impl Server { .service(Self::get_logstream_webscope()) .service(Self::get_user_webscope()) .service(Self::get_dashboards_webscope()) + .service(Self::get_filters_webscope()) .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()), @@ -182,6 +184,33 @@ impl Server { ) } + // get the filters web scope + pub fn get_filters_webscope() -> Scope { + web::scope("/filters").service( + web::scope("/{user_id}") + .service( + web::resource("") + .route(web::get().to(filters::list).authorize(Action::ListFilter)), + ) + .service( + web::scope("/{filter_id}").service( + web::resource("") + .route(web::get().to(filters::get).authorize(Action::GetFilter)) + .route( + web::post() + .to(filters::post) + .authorize(Action::CreateFilter), + ) + .route( + web::delete() + .to(filters::delete) + .authorize(Action::DeleteFilter), + ), + ), + ), + ) + } + // get the query factory pub fn get_query_factory() -> Resource { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs new file mode 100644 index 000000000..cc5225e03 --- /dev/null +++ b/server/src/handlers/http/users/filters.rs @@ -0,0 +1,165 @@ +use crate::{ + handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY}, + option::CONFIG, + storage::{object_storage::filter_path, ObjectStorageError}, +}; +use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; +use http::StatusCode; +use serde_json::{Error as SerdeError, Value as JsonValue}; + +pub async fn list(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + // .users/user_id/filters/stream_name/filter_id + let path = filter_path(user_id, stream_name, ""); + + let store = CONFIG.storage().get_object_store(); + let filters = store + .get_objects( + Some(&path), + Box::new(|file_name: String| file_name.ends_with("json")), + ) + .await?; + + let mut filt = vec![]; + for filter in filters { + filt.push(serde_json::from_slice::(&filter)?) + } + + Ok((web::Json(filt), StatusCode::OK)) +} + +pub async fn get(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let filt_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + let resource = CONFIG + .storage() + .get_object_store() + .get_object(&path) + .await?; + + let resource = serde_json::from_slice::(&resource)?; + + Ok((web::Json(resource), StatusCode::OK)) +} + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let filt_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + + let store = CONFIG.storage().get_object_store(); + store.put_object(&path, body).await?; + + Ok(HttpResponse::Ok().finish()) +} + +pub async fn delete(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let filt_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + let store = CONFIG.storage().get_object_store(); + store.delete_object(&path).await?; + + Ok(HttpResponse::Ok().finish()) +} + +#[derive(Debug, thiserror::Error)] +pub enum FiltersError { + #[error("Failed to connect to storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), +} + +impl actix_web::ResponseError for FiltersError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} + +// #[derive(Debug, Serialize, Deserialize)] +// pub struct Filters { +// version: String, +// stream_name: String, +// filter_name: String, +// query: String, +// time-filter: `type_not_defined` +// } diff --git a/server/src/handlers/http/users/mod.rs b/server/src/handlers/http/users/mod.rs index 547da2fef..df6e9f2c4 100644 --- a/server/src/handlers/http/users/mod.rs +++ b/server/src/handlers/http/users/mod.rs @@ -1,4 +1,6 @@ pub mod dashboards; +pub mod filters; pub const USERS_ROOT_DIR: &str = ".users"; pub const DASHBOARDS_DIR: &str = "dashboards"; +pub const FILTER_DIR: &str = "filters"; diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index aced0c288..6253c4a8e 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -54,6 +54,10 @@ pub enum Action { GetDashboard, CreateDashboard, DeleteDashboard, + ListFilter, + GetFilter, + CreateFilter, + DeleteFilter, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -115,6 +119,10 @@ impl RoleBuilder { | Action::GetDashboard | Action::CreateDashboard | Action::DeleteDashboard + | Action::GetFilter + | Action::ListFilter + | Action::CreateFilter + | Action::DeleteFilter | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f702d510e..4fbca6f48 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,7 +27,7 @@ use super::{ use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; -use crate::handlers::http::users::{DASHBOARDS_DIR, USERS_ROOT_DIR}; +use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::option::Mode; use crate::{ alerts::Alerts, @@ -606,8 +606,20 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { /// if dashboard_id is an empty str it should not append it to the rel path #[inline(always)] -pub fn dashboard_path(user_id: &str, dashboard_id: &str) -> RelativePathBuf { - RelativePathBuf::from_iter([USERS_ROOT_DIR, user_id, DASHBOARDS_DIR, dashboard_id]) +pub fn dashboard_path(user_id: &str, dashboard_file_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([USERS_ROOT_DIR, user_id, DASHBOARDS_DIR, dashboard_file_name]) +} + +/// if filter_id is an empty str it should not append it to the rel path +#[inline(always)] +pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([ + USERS_ROOT_DIR, + user_id, + FILTER_DIR, + stream_name, + filter_file_name, + ]) } /// path will be ".parseable/.parsable.json" From f121f7488e5ba4141f401621842ffa04bb1fd05c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 7 May 2024 16:17:09 +0530 Subject: [PATCH 03/24] add cli args for query caching --- server/src/cli.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/server/src/cli.rs b/server/src/cli.rs index cd3f8cf7a..961274035 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -92,6 +92,12 @@ pub struct Cli { /// port use by airplane(flight query service) pub flight_port: u16, + + /// to query cached data + pub query_cache_path: Option, + + /// Size for local cache + pub query_cache_size: u64, } impl Cli { @@ -102,6 +108,8 @@ impl Cli { pub const DOMAIN_URI: &'static str = "origin"; pub const STAGING: &'static str = "local-staging-path"; pub const CACHE: &'static str = "cache-path"; + pub const QUERY_CACHE: &'static str = "query-cache-path"; + pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size"; pub const CACHE_SIZE: &'static str = "cache-size"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; @@ -191,6 +199,25 @@ impl Cli { .next_line_help(true), ) + .arg( + Arg::new(Self::QUERY_CACHE) + .long(Self::QUERY_CACHE) + .env("P_QUERY_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::QUERY_CACHE_SIZE) + .long(Self::QUERY_CACHE_SIZE) + .env("P_QUERY_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) .arg( Arg::new(Self::USERNAME) .long(Self::USERNAME) @@ -372,6 +399,7 @@ impl FromArgMatches for Cli { fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { self.local_cache_path = m.get_one::(Self::CACHE).cloned(); + self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); @@ -394,6 +422,10 @@ impl FromArgMatches for Cli { .get_one::(Self::CACHE_SIZE) .cloned() .expect("default value for cache size"); + self.query_cache_size = m + .get_one(Self::QUERY_CACHE_SIZE) + .cloned() + .expect("default value for query cache size"); self.username = m .get_one::(Self::USERNAME) .cloned() From 721abf53ba6653862ba2aa322ec96c824878eebf Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 7 May 2024 16:18:05 +0530 Subject: [PATCH 04/24] misc changes --- server/src/migration.rs | 2 +- server/src/query.rs | 3 +++ server/src/storage/staging.rs | 2 +- server/src/utils.rs | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index 2c543516e..d8c89b578 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -158,7 +158,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } #[inline(always)] -fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { +pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { serde_json::to_vec(any) .map(|any| any.into()) .expect("serialize cannot fail") diff --git a/server/src/query.rs b/server/src/query.rs index ce80ba2e1..f6a984537 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -186,6 +186,9 @@ impl TableScanVisitor { pub fn into_inner(self) -> Vec { self.tables } + pub fn top(&self) -> &str { + self.tables[0].as_ref() + } } impl TreeNodeVisitor for TableScanVisitor { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index b30c8689c..82c819082 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -293,7 +293,7 @@ pub fn convert_disk_files_to_parquet( } } -fn parquet_writer_props( +pub fn parquet_writer_props( time_partition: Option, index_time_partition: usize, custom_partition_fields: HashMap, diff --git a/server/src/utils.rs b/server/src/utils.rs index 82f4f226e..84b604a51 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -280,7 +280,7 @@ pub fn get_ingestor_id() -> String { let result = format!("{:x}", hasher.finalize()); let result = result.split_at(15).0.to_string(); log::debug!("Ingestor ID: {}", &result); - result.to_string() + result } #[cfg(test)] From e654177c3ad6a723443dec025960c7e10998b549 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 7 May 2024 16:18:47 +0530 Subject: [PATCH 05/24] update errors --- server/src/handlers/http/query.rs | 2 ++ server/src/localcache.rs | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 534db8075..854dda45e 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -290,6 +290,8 @@ pub enum QueryError { Execute(#[from] ExecuteError), #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error("Cache Error: {0}")] + CacheError(#[from] CacheError), #[error("Evern Error: {0}")] EventError(#[from] EventError), #[error("Error: {0}")] diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 9ba0c15f8..22b817646 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -25,9 +25,10 @@ use human_size::{Byte, Gigibyte, SpecificSize}; use itertools::{Either, Itertools}; use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; +use parquet::errors::ParquetError; use tokio::{fs, sync::Mutex}; -use crate::option::CONFIG; +use crate::{metadata::error::stream_info::MetadataError, option::CONFIG}; pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; pub const CACHE_META_FILENAME: &str = ".cache_meta.json"; @@ -256,4 +257,8 @@ pub enum CacheError { MoveError(#[from] fs_extra::error::Error), #[error("{0}")] ObjectStoreError(#[from] object_store::Error), + #[error("{0}")] + ParquetError(#[from] ParquetError), + #[error("{0}")] + MetadataError(#[from] MetadataError), } From e081a1b6e02290a3f522e683cb8b310d2e073db0 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 7 May 2024 16:19:47 +0530 Subject: [PATCH 06/24] refactor: improve time parsing logic in query handler --- server/src/handlers/http/query.rs | 37 +++++++++++++++++++------------ 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 854dda45e..b9fedd269 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -200,20 +200,7 @@ pub async fn into_query( return Err(QueryError::EmptyEndTime); } - let start: DateTime; - let end: DateTime; - - if query.end_time == "now" { - end = Utc::now(); - start = end - chrono::Duration::from_std(humantime::parse_duration(&query.start_time)?)?; - } else { - start = DateTime::parse_from_rfc3339(&query.start_time) - .map_err(|_| QueryError::StartTimeParse)? - .into(); - end = DateTime::parse_from_rfc3339(&query.end_time) - .map_err(|_| QueryError::EndTimeParse)? - .into(); - }; + let (start, end) = parse_human_time(&query.start_time, &query.end_time)?; if start.timestamp() > end.timestamp() { return Err(QueryError::StartTimeAfterEndTime); @@ -227,6 +214,28 @@ pub async fn into_query( }) } +fn parse_human_time( + start_time: &str, + end_time: &str, +) -> Result<(DateTime, DateTime), QueryError> { + let start: DateTime; + let end: DateTime; + + if end_time == "now" { + end = Utc::now(); + start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; + } else { + start = DateTime::parse_from_rfc3339(start_time) + .map_err(|_| QueryError::StartTimeParse)? + .into(); + end = DateTime::parse_from_rfc3339(end_time) + .map_err(|_| QueryError::EndTimeParse)? + .into(); + }; + + Ok((start, end)) +} + /// unused for now, might need it in the future #[allow(unused)] fn transform_query_for_ingestor(query: &Query) -> Option { From 455dc8196affb4e159ee7aedb2b49beef594df48 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 7 May 2024 16:20:10 +0530 Subject: [PATCH 07/24] impl query result caching --- server/Cargo.toml | 9 +- server/src/handlers/http/query.rs | 46 +++- server/src/main.rs | 1 + server/src/querycache.rs | 338 ++++++++++++++++++++++++++++++ 4 files changed, 389 insertions(+), 5 deletions(-) create mode 100644 server/src/querycache.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index 5d9eb69e2..9ca5b794b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -9,13 +9,14 @@ build = "build.rs" [dependencies] ### apache arrow/datafusion dependencies +# arrow = "51.0.0" arrow-schema = { version = "51.0.0", features = ["serde"] } arrow-array = { version = "51.0.0" } arrow-json = "51.0.0" arrow-ipc = { version = "51.0.0", features = ["zstd"] } arrow-select = "51.0.0" datafusion = "37.1.0" -object_store = { version = "0.9.1", features = ["cloud", "aws"] } +object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up parquet = "51.0.0" arrow-flight = { version = "51.0.0", features = [ "tls" ] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } @@ -72,11 +73,11 @@ relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default_features = false, features = [ "rustls-tls", "json", -] } -rustls = "0.22.4" +] } # cannot update cause rustls is not latest `see rustls` +rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet rustls-pemfile = "2.1.2" semver = "1.0" -serde = { version = "1.0", features = ["rc"] } +serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1.0" static-files = "0.2" sysinfo = "0.30.11" diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index b9fedd269..f6db4e5c9 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -34,11 +34,13 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::event::commit_schema; +use crate::localcache::CacheError; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::querycache::QueryCacheManager; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -50,7 +52,7 @@ use crate::utils::actix::extract_session_key_from_req; #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { - pub query: String, + pub pub query: String, pub start_time: String, pub end_time: String, #[serde(default)] @@ -72,6 +74,36 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, +} + +impl QueryCache { + fn new() -> Self { + Self { + version: CURRENT_QUERY_CACHE_VERSION.to_string(), + current_size: 0, + files: Cache::new(100), + } + } + + pub fn get_file(&mut self, key: String) -> Option { + self.files.get(&key).cloned() + } + + // read the parquet + // return the recordbatches + pub async fn get_cached_records( + &self, + path: &PathBuf, + ) -> Result<(Vec, Vec), CacheError> { + let file = AsyncFs::File::open(path).await?; + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; + // Build a async parquet reader. + let stream = builder.build()?; + + let records = stream.try_collect::>().await?; + let fields = records.first().map_or_else(Vec::new, |record| { + record + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec() + }); + + Ok((records, fields)) + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct QueryCacheMeta { + version: String, + size_capacity: u64, +} + +impl QueryCacheMeta { + fn new() -> Self { + Self { + version: CURRENT_QUERY_CACHE_VERSION.to_string(), + size_capacity: 0, + } + } +} + +pub struct QueryCacheManager { + filesystem: LocalFileSystem, + cache_path: PathBuf, + cache_capacity: u64, + semaphore: Mutex<()>, +} + +impl QueryCacheManager { + pub fn gen_file_path(query_staging_path: &str, stream: &str) -> PathBuf { + PathBuf::from_iter([ + query_staging_path, + stream, + &format!( + "{}.{}.parquet", + hostname_unchecked(), + Utc::now().to_rfc3339() + ), + ]) + } + pub async fn global(config_capacity: u64) -> Result, CacheError> { + static INSTANCE: OnceCell = OnceCell::new(); + + let cache_path = CONFIG.parseable.query_cache_path.as_ref(); + + if cache_path.is_none() { + return Ok(None); + } + + let cache_path = cache_path.unwrap(); + + let cache_manager = INSTANCE.get_or_init(|| { + let cache_path = cache_path.clone(); + std::fs::create_dir_all(&cache_path).unwrap(); + Self { + filesystem: LocalFileSystem::new(), + cache_path, + cache_capacity: CONFIG.parseable.query_cache_size, + semaphore: Mutex::new(()), + } + }); + + cache_manager.validate(config_capacity).await?; + + Ok(Some(cache_manager)) + } + + async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { + fs::create_dir_all(&self.cache_path).await?; + let path = query_cache_meta_path(&self.cache_path) + .map_err(|err| CacheError::ObjectStoreError(err.into()))?; + let resp = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + + let updated_cache = match resp { + Ok(bytes) => { + let mut meta: QueryCacheMeta = serde_json::from_slice(&bytes)?; + if meta.size_capacity != config_capacity { + // log the change in cache size + let configured_size_human: SpecificSize = + SpecificSize::new(config_capacity as f64, Byte) + .unwrap() + .into(); + let current_size_human: SpecificSize = + SpecificSize::new(meta.size_capacity as f64, Byte) + .unwrap() + .into(); + log::warn!( + "Cache size is updated from {} to {}", + current_size_human, + configured_size_human + ); + meta.size_capacity = config_capacity; + Some(meta) + } else { + None + } + } + Err(object_store::Error::NotFound { .. }) => { + let mut meta = QueryCacheMeta::new(); + meta.size_capacity = config_capacity; + Some(meta) + } + Err(err) => return Err(err.into()), + }; + + if let Some(updated_cache) = updated_cache { + let result = self + .filesystem + .put(&path, serde_json::to_vec(&updated_cache)?.into()) + .await?; + log::info!("Cache meta file updated: {:?}", result); + } + + Ok(()) + } + + pub async fn get_cache(&self, stream: &str) -> Result { + let path = query_cache_file_path(&self.cache_path, stream).unwrap(); + let res = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + let cache = match res { + Ok(bytes) => serde_json::from_slice(&bytes)?, + Err(object_store::Error::NotFound { .. }) => QueryCache::new(), + Err(err) => return Err(err.into()), + }; + Ok(cache) + } + + pub async fn put_cache(&self, stream: &str, cache: &QueryCache) -> Result<(), CacheError> { + let path = query_cache_file_path(&self.cache_path, stream).unwrap(); + let bytes = serde_json::to_vec(cache)?.into(); + let result = self.filesystem.put(&path, bytes).await?; + log::info!("Cache file updated: {:?}", result); + Ok(()) + } + + pub async fn move_to_cache( + &self, + stream: &str, + key: String, + staging_path: &Path, + ) -> Result<(), CacheError> { + let lock = self.semaphore.lock().await; + let mut cache_path = self.cache_path.join(stream); + fs::create_dir_all(&cache_path).await?; + cache_path.push(staging_path.file_name().unwrap()); + // this needs to be the record batches parquet + // fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?; + let file_size = std::fs::metadata(&cache_path)?.len(); + let mut cache = self.get_cache(stream).await?; + + while cache.current_size + file_size > self.cache_capacity { + if let Some((_, file_for_removal)) = cache.files.pop_lru() { + let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); + cache.current_size = cache.current_size.saturating_sub(lru_file_size); + log::info!("removing cache entry"); + tokio::spawn(fs::remove_file(file_for_removal)); + } else { + log::error!("Cache size too small"); + break; + } + } + + if cache.files.is_full() { + cache.files.resize(cache.files.capacity() * 2); + } + cache.files.push(key, cache_path); + cache.current_size += file_size; + self.put_cache(stream, &cache).await?; + drop(lock); + Ok(()) + } + + pub async fn create_parquet_cache( + &self, + table_name: &str, + records: &[RecordBatch], + start: String, + end: String, + query: String, + ) -> Result<(), CacheError> { + let parquet_path = Self::gen_file_path( + self.cache_path.to_str().expect("utf-8 compat path"), + table_name, + ); + AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; + let parquet_file = AsyncFs::File::create(&parquet_path).await?; + let time_partition = STREAM_INFO.get_time_partition(table_name)?; + let props = parquet_writer_props(time_partition.clone(), 0).build(); + + let mut arrow_writer = AsyncArrowWriter::try_new( + parquet_file, + STREAM_INFO.schema(table_name).expect("schema present"), + Some(props), + )?; + + for record in records { + if let Err(e) = arrow_writer.write(record).await { + log::error!("Error While Writing to Query Cache: {}", e); + } + } + + arrow_writer.close().await?; + self.move_to_cache( + table_name, + format!("{}-{}-{}", start, end, query), + &parquet_path, + ) + .await + // match AsyncArrowWriter::try_new( + // parquet_file, + // STREAM_INFO.schema(&table_name).expect("schema present"), + // Some(props), + // ) { + // Ok(mut writer) => { + // for record in records { + // if let Err(e) = writer.write(record).await { + // log::error!("Error While Writing to Query Cache: {}", e); + // err_flag = true; + // } + // } + // + // if let Err(e) = writer.close().await { + // log::error!( + // "Unstable State: Async ArrowWriter Faild to close. Error: {}", + // e + // ); + // + // err_flag = true; + // } + // } + // Err(err) => { + // log::error!("Failed to create an Async ArrowWriter. Reason: {}", err); + // err_flag = true; + // } + // } + // + // if err_flag { + // log::error!("Failed to cache query result"); + // } else { + // + // } + } +} + +fn query_cache_file_path( + root: impl AsRef, + stream: &str, +) -> Result { + let mut path = root.as_ref().join(stream); + path.push(QUERY_CACHE_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + +fn query_cache_meta_path( + root: impl AsRef, +) -> Result { + let path = root.as_ref().join(QUERY_CACHE_META_FILENAME); + object_store::path::Path::from_absolute_path(path) +} From 35118c82a1b0629c579284ac67a86f6346bbd3a2 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 13 May 2024 17:12:53 +0530 Subject: [PATCH 08/24] update querying with cache --- server/src/handlers.rs | 2 + server/src/handlers/http/query.rs | 108 ++++++++++++++++++++---------- server/src/query.rs | 4 +- 3 files changed, 75 insertions(+), 39 deletions(-) diff --git a/server/src/handlers.rs b/server/src/handlers.rs index d0c11690f..9e3f181ac 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -23,6 +23,8 @@ pub mod livetail; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; +const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results"; +const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index f6db4e5c9..abf633908 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -34,6 +34,7 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::event::commit_schema; +use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY}; use crate::localcache::CacheError; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; @@ -74,36 +75,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result {} + (None, Some(_)) => {} + (Some(_), None) => { + log::warn!( + "Instructed to show cached results but Query Caching is not Enabledon Server" + ); + } + (Some(_), Some(query_cache_manager)) => { + let mut query_cache = query_cache_manager.get_cache(stream).await?; + + let (start, end) = + parse_human_time(&query_request.start_time, &query_request.end_time)?; + let key = format!( + "{}-{}-{}", + start.to_rfc3339(), + end.to_rfc3339(), + query_request.query.clone() + ); + + let file_path = query_cache.get_file(key); + if let Some(file_path) = file_path { + let (records, fields) = query_cache.get_cached_records(&file_path).await?; + let response = QueryResponse { + records, + fields, + fill_null: query_request.send_null, + with_fields: query_request.fields, + } + .to_http()?; - return Ok(response); + return Ok(response); + } } - }; + } let tables = visitor.into_inner(); @@ -125,23 +149,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result {} + (None, Some(_)) => {} + (Some(_), None) => { + log::warn!( + "Instructed to cache query results but Query Caching is not Enabled in Server" + ); + } + // do cache + (Some(_), Some(query_cache_manager)) => { + query_cache_manager + .create_parquet_cache( + &table_name, + &records, + query.start.to_rfc3339(), + query.end.to_rfc3339(), + query_request.query, + ) + .await? + } } let response = QueryResponse { @@ -348,7 +382,7 @@ pub enum QueryError { #[error("Evern Error: {0}")] EventError(#[from] EventError), #[error("Error: {0}")] - MalformedQuery(String), + MalformedQuery(&'static str), #[error( r#"Error: Failed to Parse Record Batch into Json Description: {0}"# diff --git a/server/src/query.rs b/server/src/query.rs index f6a984537..7239467c0 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -186,8 +186,8 @@ impl TableScanVisitor { pub fn into_inner(self) -> Vec { self.tables } - pub fn top(&self) -> &str { - self.tables[0].as_ref() + pub fn top(&self) -> Option<&str> { + self.tables.first().map(|s| s.as_ref()) } } From 89428f14fce219344304108b21d23caf21114340 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 14 May 2024 17:04:30 +0530 Subject: [PATCH 09/24] chore: clean up --- server/src/catalog.rs | 3 +-- server/src/event/writer/file_writer.rs | 9 ++------- server/src/handlers/http/ingest.rs | 6 +++++- server/src/handlers/http/query.rs | 2 +- server/src/handlers/http/users/dashboards.rs | 8 +++++++- server/src/handlers/http/users/filters.rs | 2 +- server/src/query/listing_table_builder.rs | 1 - server/src/query/stream_schema_provider.rs | 4 ++-- server/src/response.rs | 4 ++-- server/src/storage.rs | 6 +++++- server/src/storage/store_metadata.rs | 9 +++++---- 11 files changed, 31 insertions(+), 23 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index c88d4c9a5..304752ccf 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -230,8 +230,7 @@ async fn create_manifest( .ok_or(IOError::new( ErrorKind::Other, "Failed to create upper bound for manifest", - )) - .map_err(ObjectStorageError::IoError)?, + ))?, ) .and_utc(); diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 488f8da8a..0b990421d 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -49,10 +49,7 @@ impl FileWriter { ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { - writer - .writer - .write(record) - .map_err(StreamWriterError::Writer)?; + writer.writer.write(record)?; } // entry is not present thus we create it None => { @@ -100,8 +97,6 @@ fn init_new_stream_writer_file( let mut stream_writer = StreamWriter::try_new(file, &record.schema()) .expect("File and RecordBatch both are checked"); - stream_writer - .write(record) - .map_err(StreamWriterError::Writer)?; + stream_writer.write(record)?; Ok((path, stream_writer)) } diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index b442b6efc..25750ea8f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -170,7 +170,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let object_store_format = glob_storage .get_object_store_format(&stream_name) .await - .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + .map_err(|_| PostError::StreamNotFound(stream_name.clone()))?; let time_partition = object_store_format.time_partition; let time_partition_limit = object_store_format.time_partition_limit; @@ -433,6 +433,9 @@ pub enum PostError { #[error("{0}")] CreateStream(#[from] CreateStreamError), #[error("Error: {0}")] + Error(std::io::Error), + #[allow(unused)] + #[error("Error: {0}")] CustomError(String), #[error("Error: {0}")] NetworkError(#[from] reqwest::Error), @@ -461,6 +464,7 @@ impl actix_web::ResponseError for PostError { PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index abf633908..7018ede05 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -53,7 +53,7 @@ use crate::utils::actix::extract_session_key_from_req; #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { - pub pub query: String, + pub query: String, pub start_time: String, pub end_time: String, #[serde(default)] diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index 2478b1dfa..d33b1600e 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -98,7 +98,7 @@ pub async fn delete(req: HttpRequest) -> Result { // version: String, // name: String, // id: String, -// time-filter: `type_not_defined` +// time_filter: TimeFilter // refresh_interval: u64, // pannels: Vec, // } @@ -112,6 +112,12 @@ pub async fn delete(req: HttpRequest) -> Result { // headers: Vec, // dimensions: (u64, u64), // } +// +// #[derive(Debug, Serialize, Deserialize)] +// pub struct TimeFilter { +// to: String, +// from: String +// } #[derive(Debug, thiserror::Error)] pub enum DashboardError { diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs index cc5225e03..2e4c4d1f3 100644 --- a/server/src/handlers/http/users/filters.rs +++ b/server/src/handlers/http/users/filters.rs @@ -161,5 +161,5 @@ impl actix_web::ResponseError for FiltersError { // stream_name: String, // filter_name: String, // query: String, -// time-filter: `type_not_defined` +// time-filter: TimeFilter // } diff --git a/server/src/query/listing_table_builder.rs b/server/src/query/listing_table_builder.rs index ee5b8c0d1..278ed5783 100644 --- a/server/src/query/listing_table_builder.rs +++ b/server/src/query/listing_table_builder.rs @@ -167,7 +167,6 @@ impl ListingTableBuilder { }) .try_collect() .await - // TODO: make the err map better .map_err(|err| DataFusionError::External(Box::new(err)))?; let mut res = res.into_iter().flatten().collect_vec(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index dccb3aa84..7100ac95a 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -180,8 +180,8 @@ async fn collect_from_snapshot( .map(|item| item.manifest_path) .collect(), ) - .await - .map_err(DataFusionError::ObjectStore)?; + .await?; + let mut manifest_files: Vec<_> = manifest_files .into_iter() .flat_map(|file| file.files) diff --git a/server/src/response.rs b/server/src/response.rs index 0f5deb5ec..6ea07bda4 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -33,8 +33,8 @@ impl QueryResponse { pub fn to_http(&self) -> Result { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records) - .map_err(|err| QueryError::JsonParse(err.to_string()))?; + let mut json_records = record_batches_to_json(&records)?; + if self.fill_null { for map in &mut json_records { for field in &self.fields { diff --git a/server/src/storage.rs b/server/src/storage.rs index cf25ad826..d28dcb00e 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,9 @@ * */ -use crate::{catalog::snapshot::Snapshot, stats::FullStats}; +use crate::{ + catalog::snapshot::Snapshot, metadata::error::stream_info::MetadataError, stats::FullStats, +}; use chrono::Local; @@ -209,6 +211,8 @@ pub enum ObjectStorageError { UnhandledError(Box), #[error("Error: {0}")] PathError(relative_path::FromPathError), + #[error("Error: {0}")] + MetadataError(#[from] MetadataError), #[allow(dead_code)] #[error("Authentication Error: {0}")] diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 44ae55868..d3ecd4040 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -116,9 +116,7 @@ pub async fn resolve_parseable_metadata() -> Result Result { // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query - if Mode::from_string(&metadata.server_mode).map_err(ObjectStorageError::Custom)? == Mode::All && CONFIG.parseable.mode == Mode::Ingest { + if Mode::from_string(&metadata.server_mode) + .map_err(ObjectStorageError::Custom) + ? + == Mode::All && CONFIG.parseable.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { create_dir_all(CONFIG.staging_dir())?; From 810ad546f930e13a0fb91acfdb1573e1166f4163 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 14 May 2024 17:05:00 +0530 Subject: [PATCH 10/24] impl query caching --- server/src/handlers.rs | 1 + server/src/handlers/http/query.rs | 136 ++++++++++++------------------ server/src/querycache.rs | 106 ++++++++++++----------- 3 files changed, 108 insertions(+), 135 deletions(-) diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 9e3f181ac..5d173e1b4 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -25,6 +25,7 @@ const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results"; const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached"; +const USER_ID_HEADER_KEY: &str = "x-p-user-id"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 7018ede05..c5fa0aa7a 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,12 +19,13 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; +use anyhow::anyhow; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; -use http::StatusCode; +use http::{HeaderValue, StatusCode}; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -32,9 +33,10 @@ use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use arrow_array::RecordBatch; use crate::event::commit_schema; -use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY}; +use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; use crate::localcache::CacheError; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; @@ -83,100 +85,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result {} - (None, Some(_)) => {} - (Some(_), None) => { - log::warn!( - "Instructed to show cached results but Query Caching is not Enabledon Server" - ); - } - (Some(_), Some(query_cache_manager)) => { - let mut query_cache = query_cache_manager.get_cache(stream).await?; - - let (start, end) = - parse_human_time(&query_request.start_time, &query_request.end_time)?; - let key = format!( - "{}-{}-{}", - start.to_rfc3339(), - end.to_rfc3339(), - query_request.query.clone() - ); - - let file_path = query_cache.get_file(key); - if let Some(file_path) = file_path { - let (records, fields) = query_cache.get_cached_records(&file_path).await?; - let response = QueryResponse { - records, - fields, - fill_null: query_request.send_null, - with_fields: query_request.fields, - } - .to_http()?; - - return Ok(response); - } - } - } + .get(USER_ID_HEADER_KEY) + .ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))? + .to_str() + .map_err(|err| anyhow!(err))?; + + // deal with cached data + if let Ok(results) = get_results_from_cache( + show_cached, + query_cache_manager, + stream, + user_id, + &query_request.start_time, + &query_request.end_time, + &query_request.query, + query_request.send_null, + query_request.fields, + ) + .await + { + return results.to_http(); + }; let tables = visitor.into_inner(); - - if CONFIG.parseable.mode == Mode::Query { - for table in tables { - if let Ok(new_schema) = fetch_schema(&table).await { - // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(&table, new_schema.clone()) - .await - .map_err(QueryError::ObjectStorage)?; - commit_schema(&table, Arc::new(new_schema)).map_err(QueryError::EventError)?; - } - } - } + update_schema_when_distributed(tables).await?; let mut query: LogicalQuery = into_query(&query_request, &session_state).await?; - let creds = extract_session_key_from_req(&req).expect("expects basic auth"); - let permissions: Vec = Users.get_permissions(&creds); + let creds = extract_session_key_from_req(&req)?; + let permissions = Users.get_permissions(&creds); let table_name = query .first_table_name() .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + authorize_and_set_filter_tags(&mut query, permissions, &table_name)?; let time = Instant::now(); - let (records, fields) = query.execute(table_name.clone()).await?; - - match (cache_results, query_cache_manager) { - (None, None) => {} - (None, Some(_)) => {} - (Some(_), None) => { - log::warn!( - "Instructed to cache query results but Query Caching is not Enabled in Server" - ); - } - // do cache - (Some(_), Some(query_cache_manager)) => { - query_cache_manager - .create_parquet_cache( - &table_name, - &records, - query.start.to_rfc3339(), - query.end.to_rfc3339(), - query_request.query, - ) - .await? - } - } + // deal with cache saving + put_results_in_cache( + cache_results, + user_id, + query_cache_manager, + &table_name, + &records, + query.start.to_rfc3339(), + query.end.to_rfc3339(), + query_request.query, + ) + .await; let response = QueryResponse { records, @@ -195,7 +156,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, table_name: &str, @@ -379,15 +340,22 @@ pub enum QueryError { ObjectStorage(#[from] ObjectStorageError), #[error("Cache Error: {0}")] CacheError(#[from] CacheError), + #[error("")] + CacheMiss, #[error("Evern Error: {0}")] EventError(#[from] EventError), #[error("Error: {0}")] MalformedQuery(&'static str), + #[allow(unused)] #[error( r#"Error: Failed to Parse Record Batch into Json Description: {0}"# )] JsonParse(String), + #[error("Error: {0}")] + ActixError(#[from] actix_web::Error), + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), } impl actix_web::ResponseError for QueryError { diff --git a/server/src/querycache.rs b/server/src/querycache.rs index a531dd948..9fc5971ae 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -12,6 +12,7 @@ use std::path::{Path, PathBuf}; use tokio::fs as AsyncFs; use tokio::{fs, sync::Mutex}; +use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metadata::STREAM_INFO; use crate::storage::staging::parquet_writer_props; use crate::{localcache::CacheError, option::CONFIG, utils::hostname_unchecked}; @@ -20,6 +21,7 @@ pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; pub const QUERY_CACHE_META_FILENAME: &str = ".cache_meta.json"; pub const CURRENT_QUERY_CACHE_VERSION: &str = "v1"; +// .cache.json #[derive(Default, Clone, serde::Deserialize, serde::Serialize, Debug, Hash, Eq, PartialEq)] pub struct CacheMetadata { query: String, @@ -75,6 +77,7 @@ impl QueryCache { } } +// .cache_meta.json #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct QueryCacheMeta { version: String, @@ -92,15 +95,17 @@ impl QueryCacheMeta { pub struct QueryCacheManager { filesystem: LocalFileSystem, - cache_path: PathBuf, + cache_path: PathBuf, // refers to the path passed in the env var cache_capacity: u64, semaphore: Mutex<()>, } impl QueryCacheManager { - pub fn gen_file_path(query_staging_path: &str, stream: &str) -> PathBuf { + pub fn gen_file_path(query_staging_path: &str, stream: &str, user_id: &str) -> PathBuf { PathBuf::from_iter([ query_staging_path, + USERS_ROOT_DIR, + user_id, stream, &format!( "{}.{}.parquet", @@ -189,8 +194,8 @@ impl QueryCacheManager { Ok(()) } - pub async fn get_cache(&self, stream: &str) -> Result { - let path = query_cache_file_path(&self.cache_path, stream).unwrap(); + pub async fn get_cache(&self, stream: &str, user_id: &str) -> Result { + let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); let res = self .filesystem .get(&path) @@ -204,8 +209,14 @@ impl QueryCacheManager { Ok(cache) } - pub async fn put_cache(&self, stream: &str, cache: &QueryCache) -> Result<(), CacheError> { - let path = query_cache_file_path(&self.cache_path, stream).unwrap(); + pub async fn put_cache( + &self, + stream: &str, + cache: &QueryCache, + user_id: &str, + ) -> Result<(), CacheError> { + let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); + let bytes = serde_json::to_vec(cache)?.into(); let result = self.filesystem.put(&path, bytes).await?; log::info!("Cache file updated: {:?}", result); @@ -216,20 +227,16 @@ impl QueryCacheManager { &self, stream: &str, key: String, - staging_path: &Path, + file_path: &Path, + user_id: &str, ) -> Result<(), CacheError> { let lock = self.semaphore.lock().await; - let mut cache_path = self.cache_path.join(stream); - fs::create_dir_all(&cache_path).await?; - cache_path.push(staging_path.file_name().unwrap()); - // this needs to be the record batches parquet - // fs_extra::file::move_file(staging_path, &cache_path, &self.copy_options)?; - let file_size = std::fs::metadata(&cache_path)?.len(); - let mut cache = self.get_cache(stream).await?; + let file_size = std::fs::metadata(file_path)?.len(); + let mut cache = self.get_cache(stream, user_id).await?; while cache.current_size + file_size > self.cache_capacity { if let Some((_, file_for_removal)) = cache.files.pop_lru() { - let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); + let lru_file_size = fs::metadata(&file_for_removal).await?.len(); cache.current_size = cache.current_size.saturating_sub(lru_file_size); log::info!("removing cache entry"); tokio::spawn(fs::remove_file(file_for_removal)); @@ -242,9 +249,9 @@ impl QueryCacheManager { if cache.files.is_full() { cache.files.resize(cache.files.capacity() * 2); } - cache.files.push(key, cache_path); + cache.files.push(key, file_path.to_path_buf()); cache.current_size += file_size; - self.put_cache(stream, &cache).await?; + self.put_cache(stream, &cache, user_id).await?; drop(lock); Ok(()) } @@ -253,12 +260,14 @@ impl QueryCacheManager { &self, table_name: &str, records: &[RecordBatch], + user_id: &str, start: String, end: String, query: String, ) -> Result<(), CacheError> { let parquet_path = Self::gen_file_path( self.cache_path.to_str().expect("utf-8 compat path"), + user_id, table_name, ); AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; @@ -283,49 +292,44 @@ impl QueryCacheManager { table_name, format!("{}-{}-{}", start, end, query), &parquet_path, + user_id, ) .await - // match AsyncArrowWriter::try_new( - // parquet_file, - // STREAM_INFO.schema(&table_name).expect("schema present"), - // Some(props), - // ) { - // Ok(mut writer) => { - // for record in records { - // if let Err(e) = writer.write(record).await { - // log::error!("Error While Writing to Query Cache: {}", e); - // err_flag = true; - // } - // } - // - // if let Err(e) = writer.close().await { - // log::error!( - // "Unstable State: Async ArrowWriter Faild to close. Error: {}", - // e - // ); - // - // err_flag = true; - // } - // } - // Err(err) => { - // log::error!("Failed to create an Async ArrowWriter. Reason: {}", err); - // err_flag = true; - // } - // } - // - // if err_flag { - // log::error!("Failed to cache query result"); - // } else { - // - // } + } + + pub async fn clear_cache(&self, stream: &str, user_id: &str) -> Result<(), CacheError> { + let cache = self.get_cache(stream, user_id).await?; + let map = cache.files.values().collect_vec(); + let p_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); + let path = self.cache_path.join(p_path); + let mut paths = fs::read_dir(path).await?; + while let Some(path) = paths.next_entry().await? { + let check = path.path().is_file() + && map.contains(&&path.path()) + && !path + .path() + .file_name() + .expect("File Name is Proper") + .to_str() + .expect("Path is Proper utf-8 ") + .ends_with(".json"); + if check { + fs::remove_file(path.path()).await?; + } + } + + Ok(()) } } fn query_cache_file_path( root: impl AsRef, stream: &str, + user_id: &str, ) -> Result { - let mut path = root.as_ref().join(stream); + let local_meta_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); + let mut path = root.as_ref().join(local_meta_path); + path.push(QUERY_CACHE_FILENAME); object_store::path::Path::from_absolute_path(path) } From 284882c0dbd7aaa7f222013df133f463fb8972a2 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 15 May 2024 16:47:19 +0530 Subject: [PATCH 11/24] add ability to store filters and dashboards in memory --- .../src/handlers/http/modal/query_server.rs | 6 ++ server/src/handlers/http/modal/server.rs | 6 ++ server/src/handlers/http/users/dashboards.rs | 37 +++------ server/src/handlers/http/users/filters.rs | 19 +++-- server/src/main.rs | 1 + server/src/users/dashboards.rs | 77 +++++++++++++++++++ server/src/users/filters.rs | 66 ++++++++++++++++ server/src/users/mod.rs | 10 +++ 8 files changed, 185 insertions(+), 37 deletions(-) create mode 100644 server/src/users/dashboards.rs create mode 100644 server/src/users/filters.rs create mode 100644 server/src/users/mod.rs diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index efa570d21..353895400 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -22,6 +22,8 @@ use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::rbac::role::Action; +use crate::users::dashboards::DASHBOARDS; +use crate::users::filters::FILTERS; use crate::sync; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::web; @@ -176,6 +178,10 @@ impl QueryServer { log::warn!("could not populate local metadata. {:?}", e); } + FILTERS.load().await?; + DASHBOARDS.load().await?; + + // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; metrics::reset_daily_metric_from_global(); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 4d086c61e..09f6bcb26 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -34,6 +34,8 @@ use crate::migration; use crate::rbac; use crate::storage; use crate::sync; +use crate::users::dashboards::DASHBOARDS; +use crate::users::filters::FILTERS; use std::sync::Arc; use actix_web::web::resource; @@ -476,6 +478,10 @@ impl Server { log::warn!("could not populate local metadata. {:?}", err); } + FILTERS.load().await?; + DASHBOARDS.load().await?; + + metrics::fetch_stats_from_storage().await; metrics::reset_daily_metric_from_global(); storage::retention::load_retention_from_global(); diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index d33b1600e..20017d506 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -2,6 +2,7 @@ use crate::{ handlers::http::ingest::PostError, option::CONFIG, storage::{object_storage::dashboard_path, ObjectStorageError}, + users::dashboards::{Dashboard, DASHBOARDS}, }; use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; @@ -44,13 +45,18 @@ pub async fn get(req: HttpRequest) -> Result { .get("dashboard_id") .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + if let Some(dashboard) = DASHBOARDS.find(dash_id) { + return Ok((web::Json(dashboard), StatusCode::OK)); + } + + //if dashboard is not in memory fetch from s3 let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); let resource = CONFIG .storage() .get_object_store() .get_object(&dash_file_path) .await?; - let resource = serde_json::from_slice::(&resource)?; + let resource = serde_json::from_slice::(&resource)?; Ok((web::Json(resource), StatusCode::OK)) } @@ -68,6 +74,9 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result(&body)?; + DASHBOARDS.update(dashboard); + let store = CONFIG.storage().get_object_store(); store.put_object(&dash_file_path, body).await?; @@ -93,32 +102,6 @@ pub async fn delete(req: HttpRequest) -> Result { Ok(HttpResponse::Ok().finish()) } -// #[derive(Debug, Serialize, Deserialize)] -// pub struct Dashboard { -// version: String, -// name: String, -// id: String, -// time_filter: TimeFilter -// refresh_interval: u64, -// pannels: Vec, -// } -// -// #[derive(Debug, Serialize, Deserialize)] -// pub struct Pannel { -// stream_name: String, -// query: String, -// chart_type: String, -// columns: Vec, -// headers: Vec, -// dimensions: (u64, u64), -// } -// -// #[derive(Debug, Serialize, Deserialize)] -// pub struct TimeFilter { -// to: String, -// from: String -// } - #[derive(Debug, thiserror::Error)] pub enum DashboardError { #[error("Failed to connect to storage: {0}")] diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs index 2e4c4d1f3..6ac5f924d 100644 --- a/server/src/handlers/http/users/filters.rs +++ b/server/src/handlers/http/users/filters.rs @@ -2,6 +2,7 @@ use crate::{ handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY}, option::CONFIG, storage::{object_storage::filter_path, ObjectStorageError}, + users::filters::{Filter, FILTERS}, }; use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; @@ -62,6 +63,11 @@ pub async fn get(req: HttpRequest) -> Result { .to_str() .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + if let Some(filter) = FILTERS.find(filt_id) { + return Ok((web::Json(filter), StatusCode::OK)); + } + + // if it is not in memory go to s3 let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); let resource = CONFIG .storage() @@ -69,7 +75,7 @@ pub async fn get(req: HttpRequest) -> Result { .get_object(&path) .await?; - let resource = serde_json::from_slice::(&resource)?; + let resource = serde_json::from_slice::(&resource)?; Ok((web::Json(resource), StatusCode::OK)) } @@ -95,6 +101,8 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result = Lazy::new(Dashboards::default); + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct Pannel { + stream_name: String, + query: String, + chart_type: String, + columns: Vec, + headers: Vec, + dimensions: (u64, u64), +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct Dashboard { + version: String, + name: String, + id: String, + time_filter: TimeFilter, + refresh_interval: u64, + pannels: Vec, +} + +impl Dashboard { + pub fn dashboard_id(&self) -> &str { + &self.id + } +} + +#[derive(Default)] +pub struct Dashboards(RwLock>); + +impl Dashboards { + pub async fn load(&self) -> anyhow::Result<()> { + let mut this = vec![]; + let path = RelativePathBuf::from(USERS_ROOT_DIR); + let store = CONFIG.storage().get_object_store(); + let objs = store + .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) + .await?; + + for obj in objs { + if let Ok(filter) = serde_json::from_slice::(&obj) { + this.push(filter); + } + } + + let mut s = self.0.write().expect(LOCK_EXPECT); + s.append(&mut this); + + Ok(()) + } + + pub fn update(&self, dashboard: Dashboard) { + let mut s = self.0.write().expect(LOCK_EXPECT); + + s.push(dashboard); + } + + pub fn find(&self, dashboard_id: &str) -> Option { + self.0 + .read() + .expect(LOCK_EXPECT) + .iter() + .find(|dashboard| dashboard.dashboard_id() == dashboard_id) + .cloned() + } +} diff --git a/server/src/users/filters.rs b/server/src/users/filters.rs new file mode 100644 index 000000000..ee3266bba --- /dev/null +++ b/server/src/users/filters.rs @@ -0,0 +1,66 @@ +use std::sync::RwLock; + +use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; +use serde::{Deserialize, Serialize}; + +use super::TimeFilter; +use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG}; + +pub static FILTERS: Lazy = Lazy::new(Filters::default); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Filter { + version: String, + stream_name: String, + filter_name: String, + filter_id: String, + query: String, + time_filter: Option, +} + +impl Filter { + pub fn filter_id(&self) -> &str { + &self.filter_id + } +} + +#[derive(Debug, Default)] +pub struct Filters(RwLock>); + +impl Filters { + pub async fn load(&self) -> anyhow::Result<()> { + let mut this = vec![]; + let path = RelativePathBuf::from(USERS_ROOT_DIR); + let store = CONFIG.storage().get_object_store(); + let objs = store + .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) + .await?; + + for obj in objs { + if let Ok(filter) = serde_json::from_slice::(&obj) { + this.push(filter); + } + } + + let mut s = self.0.write().expect(LOCK_EXPECT); + s.append(&mut this); + + Ok(()) + } + + pub fn update(&self, filter: Filter) { + let mut s = self.0.write().expect(LOCK_EXPECT); + + s.push(filter); + } + + pub fn find(&self, filter_id: &str) -> Option { + self.0 + .read() + .expect(LOCK_EXPECT) + .iter() + .find(|filter| filter.filter_id() == filter_id) + .cloned() + } +} diff --git a/server/src/users/mod.rs b/server/src/users/mod.rs new file mode 100644 index 000000000..09acef4f4 --- /dev/null +++ b/server/src/users/mod.rs @@ -0,0 +1,10 @@ +pub mod dashboards; +pub mod filters; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct TimeFilter { + to: String, + from: String, +} From 16f5a39a52099340680b2b5cb96282cc5f8abfca Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 15 May 2024 17:21:58 +0530 Subject: [PATCH 12/24] fix: bug if user_id is not provided --- server/src/handlers/http/query.rs | 116 ++++++++++++++++++++++++++++-- server/src/querycache.rs | 21 +++++- 2 files changed, 129 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index c5fa0aa7a..f8fa46289 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -87,12 +87,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result) -> Result<(), QueryError> { + if CONFIG.parseable.mode == Mode::Query { + for table in tables { + if let Ok(new_schema) = fetch_schema(&table).await { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&table, new_schema.clone()).await?; + + commit_schema(&table, Arc::new(new_schema))?; + } + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +async fn put_results_in_cache( + cache_results: Option<&HeaderValue>, + user_id: Option<&HeaderValue>, + query_cache_manager: Option<&QueryCacheManager>, + stream: &str, + records: &[RecordBatch], + start: String, + end: String, + query: String, +) { + match (cache_results, query_cache_manager) { + (Some(_), None) => { + log::warn!( + "Instructed to cache query results but Query Caching is not Enabled in Server" + ); + } + // do cache + (Some(_), Some(query_cache_manager)) => { + let user_id = user_id + .expect("User Id was provided") + .to_str() + .expect("is proper ASCII"); + + if let Err(err) = query_cache_manager + .create_parquet_cache(stream, records, user_id, start, end, query) + .await + { + log::error!("Error occured while caching query results: {:?}", err); + if query_cache_manager + .clear_cache(stream, user_id) + .await + .is_err() + { + log::error!("Error Clearing Unwanted files from cache dir"); + } + } + } + (None, _) => {} + } +} + +#[allow(clippy::too_many_arguments)] +async fn get_results_from_cache( + show_cached: Option<&HeaderValue>, + query_cache_manager: Option<&QueryCacheManager>, + stream: &str, + user_id: Option<&HeaderValue>, + start_time: &str, + end_time: &str, + query: &str, + send_null: bool, + send_fields: bool, +) -> Result { + match (show_cached, query_cache_manager) { + (Some(_), None) => { + log::warn!( + "Instructed to show cached results but Query Caching is not Enabled on Server" + ); + None + } + (Some(_), Some(query_cache_manager)) => { + let user_id = user_id + .ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))? + .to_str() + .map_err(|err| anyhow!(err))?; + + let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; + + let (start, end) = parse_human_time(start_time, end_time)?; + let key = format!("{}-{}-{}", start.to_rfc3339(), end.to_rfc3339(), query); + + let file_path = query_cache.get_file(key); + if let Some(file_path) = file_path { + let (records, fields) = query_cache.get_cached_records(&file_path).await?; + let response = QueryResponse { + records, + fields, + fill_null: send_null, + with_fields: send_fields, + }; + + Some(Ok(response)) + } else { + None + } + } + (_, _) => None, + } + .map_or_else(|| Err(QueryError::CacheMiss), |ret_val| ret_val) +} + +pub fn authorize_and_set_filter_tags( query: &mut LogicalQuery, permissions: Vec, table_name: &str, diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 9fc5971ae..6d400dd3d 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use arrow_array::RecordBatch; use chrono::Utc; use futures::TryStreamExt; @@ -8,6 +26,7 @@ use itertools::Itertools; use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; +use std::collections::HashMap; use std::path::{Path, PathBuf}; use tokio::fs as AsyncFs; use tokio::{fs, sync::Mutex}; @@ -273,7 +292,7 @@ impl QueryCacheManager { AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; let parquet_file = AsyncFs::File::create(&parquet_path).await?; let time_partition = STREAM_INFO.get_time_partition(table_name)?; - let props = parquet_writer_props(time_partition.clone(), 0).build(); + let props = parquet_writer_props(time_partition.clone(), 0, HashMap::new()).build(); let mut arrow_writer = AsyncArrowWriter::try_new( parquet_file, From 634889657c41426aae2f6a39d8f32e093c988a2c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 17 May 2024 14:49:50 +0530 Subject: [PATCH 13/24] misc add license headings --- server/src/handlers/airplane.rs | 18 ++++++++++++++++++ server/src/handlers/http/users/dashboards.rs | 18 ++++++++++++++++++ server/src/handlers/http/users/filters.rs | 18 ++++++++++++++++++ server/src/handlers/http/users/mod.rs | 18 ++++++++++++++++++ server/src/metrics/prom_utils.rs | 18 ++++++++++++++++++ server/src/static_schema.rs | 18 ++++++++++++++++++ server/src/storage/metrics_layer.rs | 18 ++++++++++++++++++ server/src/users/dashboards.rs | 18 ++++++++++++++++++ server/src/users/filters.rs | 18 ++++++++++++++++++ server/src/users/mod.rs | 18 ++++++++++++++++++ server/src/utils/arrow/flight.rs | 18 ++++++++++++++++++ 11 files changed, 198 insertions(+) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 8fd83f35d..31b2cdbee 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use arrow_array::RecordBatch; use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::FlightServiceServer; diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index 20017d506..a2997f53a 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::{ handlers::http::ingest::PostError, option::CONFIG, diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs index 6ac5f924d..49179667e 100644 --- a/server/src/handlers/http/users/filters.rs +++ b/server/src/handlers/http/users/filters.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::{ handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY}, option::CONFIG, diff --git a/server/src/handlers/http/users/mod.rs b/server/src/handlers/http/users/mod.rs index df6e9f2c4..e4931aef6 100644 --- a/server/src/handlers/http/users/mod.rs +++ b/server/src/handlers/http/users/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod dashboards; pub mod filters; diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index cd01fbd76..2df0d85a0 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::IngestorMetadata; diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index 488f88f88..5f9f60fcf 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::event::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY}; use crate::utils::arrow::get_field; use anyhow::{anyhow, Error as AnyError}; diff --git a/server/src/storage/metrics_layer.rs b/server/src/storage/metrics_layer.rs index 7fa257074..04c2f3346 100644 --- a/server/src/storage/metrics_layer.rs +++ b/server/src/storage/metrics_layer.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{ ops::Range, task::{Context, Poll}, diff --git a/server/src/users/dashboards.rs b/server/src/users/dashboards.rs index 3dbeef7e2..a447244c5 100644 --- a/server/src/users/dashboards.rs +++ b/server/src/users/dashboards.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::sync::RwLock; use once_cell::sync::Lazy; diff --git a/server/src/users/filters.rs b/server/src/users/filters.rs index ee3266bba..fe3dfa9b2 100644 --- a/server/src/users/filters.rs +++ b/server/src/users/filters.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::sync::RwLock; use once_cell::sync::Lazy; diff --git a/server/src/users/mod.rs b/server/src/users/mod.rs index 09acef4f4..7bfac6726 100644 --- a/server/src/users/mod.rs +++ b/server/src/users/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod dashboards; pub mod filters; diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 113407c9d..444e1e9ee 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; use crate::handlers::http::query::Query as QueryJson; From 9d35e819d31ee078572702f97bc42e9b8e9bf6bc Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 17 May 2024 16:51:07 +0530 Subject: [PATCH 14/24] cleanup rough edges --- server/src/handlers/http/users/dashboards.rs | 1 + server/src/users/dashboards.rs | 13 +++++++------ server/src/users/filters.rs | 7 ++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index a2997f53a..4f2bbc6d3 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -24,6 +24,7 @@ use crate::{ }; use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; + use http::StatusCode; use serde_json::{Error as SerdeError, Value as JsonValue}; diff --git a/server/src/users/dashboards.rs b/server/src/users/dashboards.rs index a447244c5..561ec2507 100644 --- a/server/src/users/dashboards.rs +++ b/server/src/users/dashboards.rs @@ -35,14 +35,14 @@ pub struct Pannel { chart_type: String, columns: Vec, headers: Vec, - dimensions: (u64, u64), + dimensions: Vec, } #[derive(Debug, Serialize, Deserialize, Default, Clone)] pub struct Dashboard { version: String, - name: String, - id: String, + dashboard_name: String, + dashboard_id: String, time_filter: TimeFilter, refresh_interval: u64, pannels: Vec, @@ -50,11 +50,11 @@ pub struct Dashboard { impl Dashboard { pub fn dashboard_id(&self) -> &str { - &self.id + &self.dashboard_id } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct Dashboards(RwLock>); impl Dashboards { @@ -64,7 +64,8 @@ impl Dashboards { let store = CONFIG.storage().get_object_store(); let objs = store .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) - .await?; + .await + .unwrap_or_default(); for obj in objs { if let Ok(filter) = serde_json::from_slice::(&obj) { diff --git a/server/src/users/filters.rs b/server/src/users/filters.rs index fe3dfa9b2..87580a8c6 100644 --- a/server/src/users/filters.rs +++ b/server/src/users/filters.rs @@ -16,11 +16,10 @@ * */ -use std::sync::RwLock; - use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde::{Deserialize, Serialize}; +use std::sync::RwLock; use super::TimeFilter; use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG}; @@ -51,9 +50,11 @@ impl Filters { let mut this = vec![]; let path = RelativePathBuf::from(USERS_ROOT_DIR); let store = CONFIG.storage().get_object_store(); + let objs = store .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) - .await?; + .await + .unwrap_or_default(); for obj in objs { if let Ok(filter) = serde_json::from_slice::(&obj) { From 83a358cc3482c6100599e1523c810504aa8be429 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 21 May 2024 12:55:18 +0530 Subject: [PATCH 15/24] update arrow flight server to perform query cache --- server/src/handlers/airplane.rs | 116 +++++++++++------- .../src/handlers/http/modal/query_server.rs | 1 - server/src/handlers/http/modal/server.rs | 1 - server/src/handlers/http/query.rs | 42 ++++--- server/src/response.rs | 13 +- server/src/storage/object_storage.rs | 2 +- server/src/utils/arrow/flight.rs | 31 ++++- 7 files changed, 133 insertions(+), 73 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 31b2cdbee..8ab7270f5 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -17,7 +17,6 @@ */ use arrow_array::RecordBatch; -use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::ArrowError; @@ -25,7 +24,6 @@ use arrow_schema::ArrowError; use datafusion::common::tree_node::TreeNode; use serde_json::json; use std::net::SocketAddr; -use std::sync::Arc; use std::time::Instant; use tonic::codec::CompressionEncoding; @@ -34,20 +32,22 @@ use futures_util::{Future, TryFutureExt}; use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; -use crate::event::commit_schema; use crate::handlers::http::cluster::get_ingestor_info; -use crate::handlers::http::fetch_schema; +use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; use crate::metrics::QUERY_EXECUTE_TIME; -use crate::option::{Mode, CONFIG}; +use crate::option::CONFIG; use crate::handlers::livetail::cross_origin_config; -use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query}; +use crate::handlers::http::query::{ + authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed, +}; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::storage::object_storage::commit_schema_to_storage; +use crate::querycache::QueryCacheManager; use crate::utils::arrow::flight::{ - append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester, + append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, + send_to_ingester, }; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, @@ -55,13 +55,15 @@ use arrow_flight::{ SchemaResult, Ticket, }; use arrow_ipc::writer::IpcWriteOptions; -use futures::{stream, TryStreamExt}; +use futures::stream; use tonic::{Request, Response, Status, Streaming}; use crate::handlers::livetail::extract_session_key; use crate::metadata::STREAM_INFO; use crate::rbac::Users; +use super::http::query::get_results_from_cache; + #[derive(Clone, Debug)] pub struct AirServiceImpl {} @@ -130,7 +132,7 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = get_query_from_ticket(req)?; + let ticket = get_query_from_ticket(&req)?; log::info!("query requested to airplane: {:?}", ticket); @@ -150,32 +152,57 @@ impl FlightService for AirServiceImpl { let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); - let tables = visitor.into_inner(); - - if CONFIG.parseable.mode == Mode::Query { - // using http to get the schema. may update to use flight later - for table in tables { - if let Ok(new_schema) = fetch_schema(&table).await { - // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(&table, new_schema.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; - commit_schema(&table, Arc::new(new_schema)) - .map_err(|err| Status::internal(err.to_string()))?; - } - } + let streams = visitor.into_inner(); + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await + .unwrap_or(None); + + let cache_results = req + .metadata() + .get(CACHE_RESULTS_HEADER_KEY) + .and_then(|value| value.to_str().ok()); // I dont think we need to own this. + + let show_cached = req + .metadata() + .get(CACHE_VIEW_HEADER_KEY) + .and_then(|value| value.to_str().ok()); + + let user_id = req + .metadata() + .get(USER_ID_HEADER_KEY) + .and_then(|value| value.to_str().ok()); + let stream_name = streams + .first() + .ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))? + .to_owned(); + + // send the cached results + if let Ok(cache_results) = get_results_from_cache( + show_cached, + query_cache_manager, + &stream_name, + user_id, + &ticket.start_time, + &ticket.end_time, + &ticket.query, + ticket.send_null, + ticket.fields, + ) + .await + { + return cache_results.into_flight(); } + update_schema_when_distributed(streams) + .await + .map_err(|err| Status::internal(err.to_string()))?; + // map payload to query let mut query = into_query(&ticket, &session_state) .await .map_err(|_| Status::internal("Failed to parse query"))?; - // if table name is not present it is a Malformed Query - let stream_name = query - .first_table_name() - .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; - let event = if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { let sql = format!("select * from {}", &stream_name); @@ -210,11 +237,23 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; let time = Instant::now(); - let (results, _) = query + let (records, _) = query .execute(stream_name.clone()) .await .map_err(|err| Status::internal(err.to_string()))?; + put_results_in_cache( + cache_results, + user_id, + query_cache_manager, + &stream_name, + &records, + query.start.to_rfc3339(), + query.end.to_rfc3339(), + ticket.query, + ) + .await; + /* * INFO: No returning the schema with the data. * kept it in case it needs to be sent in the future. @@ -226,18 +265,7 @@ impl FlightService for AirServiceImpl { .collect::>(); let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; */ - let input_stream = futures::stream::iter(results.into_iter().map(Ok)); - let write_options = IpcWriteOptions::default() - .try_with_compression(Some(arrow_ipc::CompressionType(1))) - .map_err(|err| Status::failed_precondition(err.to_string()))?; - - let flight_data_stream = FlightDataEncoderBuilder::new() - .with_max_flight_data_size(usize::MAX) - .with_options(write_options) - // .with_schema(schema.into()) - .build(input_stream); - - let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + let out = into_flight_data(records); if let Some(event) = event { event.clear(&stream_name); @@ -248,9 +276,7 @@ impl FlightService for AirServiceImpl { .with_label_values(&[&format!("flight-query-{}", stream_name)]) .observe(time); - Ok(Response::new( - Box::pin(flight_data_stream) as Self::DoGetStream - )) + out } async fn do_put( diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 353895400..a1ba35fa6 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -181,7 +181,6 @@ impl QueryServer { FILTERS.load().await?; DASHBOARDS.load().await?; - // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; metrics::reset_daily_metric_from_global(); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 09f6bcb26..c85bd1323 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -481,7 +481,6 @@ impl Server { FILTERS.load().await?; DASHBOARDS.load().await?; - metrics::fetch_stats_from_storage().await; metrics::reset_daily_metric_from_global(); storage::retention::load_retention_from_global(); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index f8fa46289..2cd2d94f5 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -25,7 +25,7 @@ use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; -use http::{HeaderValue, StatusCode}; +use http::StatusCode; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -85,9 +85,18 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result) -> Result<(), QueryError> { +pub async fn update_schema_when_distributed(tables: Vec) -> Result<(), QueryError> { if CONFIG.parseable.mode == Mode::Query { for table in tables { if let Ok(new_schema) = fetch_schema(&table).await { @@ -167,9 +176,9 @@ async fn update_schema_when_distributed(tables: Vec) -> Result<(), Query } #[allow(clippy::too_many_arguments)] -async fn put_results_in_cache( - cache_results: Option<&HeaderValue>, - user_id: Option<&HeaderValue>, +pub async fn put_results_in_cache( + cache_results: Option<&str>, + user_id: Option<&str>, query_cache_manager: Option<&QueryCacheManager>, stream: &str, records: &[RecordBatch], @@ -185,10 +194,7 @@ async fn put_results_in_cache( } // do cache (Some(_), Some(query_cache_manager)) => { - let user_id = user_id - .expect("User Id was provided") - .to_str() - .expect("is proper ASCII"); + let user_id = user_id.expect("User Id was provided"); if let Err(err) = query_cache_manager .create_parquet_cache(stream, records, user_id, start, end, query) @@ -209,11 +215,11 @@ async fn put_results_in_cache( } #[allow(clippy::too_many_arguments)] -async fn get_results_from_cache( - show_cached: Option<&HeaderValue>, +pub async fn get_results_from_cache( + show_cached: Option<&str>, query_cache_manager: Option<&QueryCacheManager>, stream: &str, - user_id: Option<&HeaderValue>, + user_id: Option<&str>, start_time: &str, end_time: &str, query: &str, @@ -228,10 +234,8 @@ async fn get_results_from_cache( None } (Some(_), Some(query_cache_manager)) => { - let user_id = user_id - .ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))? - .to_str() - .map_err(|err| anyhow!(err))?; + let user_id = + user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?; let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; diff --git a/server/src/response.rs b/server/src/response.rs index 6ea07bda4..e2abfa2d2 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -16,11 +16,18 @@ * */ -use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; +use crate::{ + handlers::http::query::QueryError, + utils::arrow::{ + flight::{into_flight_data, DoGetStream}, + record_batches_to_json, + }, +}; use actix_web::{web, Responder}; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; +use tonic::{Response, Status}; pub struct QueryResponse { pub records: Vec, @@ -57,4 +64,8 @@ impl QueryResponse { Ok(web::Json(response)) } + + pub fn into_flight(self) -> Result, Status> { + into_flight_data(self.records) + } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 4fbca6f48..5af74cd60 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,8 +26,8 @@ use super::{ }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; use crate::option::Mode; use crate::{ alerts::Alerts, diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 444e1e9ee..3b0b17eba 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -27,21 +27,25 @@ use crate::{ }; use arrow_array::RecordBatch; -use arrow_flight::Ticket; +use arrow_flight::encode::FlightDataEncoderBuilder; +use arrow_flight::{FlightData, Ticket}; +use arrow_ipc::writer::IpcWriteOptions; use arrow_select::concat::concat_batches; use datafusion::logical_expr::BinaryExpr; use datafusion::prelude::Expr; use datafusion::scalar::ScalarValue; -use futures::TryStreamExt; +use futures::{stream, TryStreamExt}; -use tonic::{Request, Status}; +use tonic::{Request, Response, Status}; use arrow_flight::FlightClient; use http::Uri; use tonic::transport::Channel; -pub fn get_query_from_ticket(req: Request) -> Result { - serde_json::from_slice::(&req.into_inner().ticket) +pub type DoGetStream = stream::BoxStream<'static, Result>; + +pub fn get_query_from_ticket(req: &Request) -> Result { + serde_json::from_slice::(&req.get_ref().ticket) .map_err(|err| Status::internal(err.to_string())) } @@ -137,3 +141,20 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { fn lit_timestamp_milli(time: i64) -> Expr { Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None)) } + +pub fn into_flight_data(records: Vec) -> Result, Status> { + let input_stream = futures::stream::iter(records.into_iter().map(Ok)); + let write_options = IpcWriteOptions::default() + .try_with_compression(Some(arrow_ipc::CompressionType(1))) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let flight_data_stream = FlightDataEncoderBuilder::new() + .with_max_flight_data_size(usize::MAX) + .with_options(write_options) + // .with_schema(schema.into()) + .build(input_stream); + + let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + + Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream)) +} From e11f1db8002a89359ea39359bf5ac6c74fba6fff Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 21 May 2024 13:38:02 +0530 Subject: [PATCH 16/24] fix: users root dir excluded when listing streams --- server/src/storage/s3.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 430aa6b0d..00ac3c01c 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -38,6 +38,7 @@ use std::path::Path as StdPath; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; @@ -305,6 +306,7 @@ impl S3 { .filter_map(|path| path.parts().next()) .map(|name| name.as_ref().to_string()) .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) + .filter(|x| x != USERS_ROOT_DIR) .collect(); let stream_json_check = FuturesUnordered::new(); From 9e9b4c7f469087ffa2b99e49b5ed91c8b19e36da Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 21 May 2024 16:57:16 +0530 Subject: [PATCH 17/24] change the key for cache --- server/src/handlers/http/query.rs | 9 ++++++--- server/src/querycache.rs | 20 +++++++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 2cd2d94f5..be46788e1 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -43,7 +43,7 @@ use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::querycache::QueryCacheManager; +use crate::querycache::{CacheMetadata, QueryCacheManager}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -240,9 +240,12 @@ pub async fn get_results_from_cache( let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; let (start, end) = parse_human_time(start_time, end_time)?; - let key = format!("{}-{}-{}", start.to_rfc3339(), end.to_rfc3339(), query); - let file_path = query_cache.get_file(key); + let file_path = query_cache.get_file(CacheMetadata::new( + query.to_string(), + start.to_rfc3339(), + end.to_rfc3339(), + )); if let Some(file_path) = file_path { let (records, fields) = query_cache.get_cached_records(&file_path).await?; let response = QueryResponse { diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 6d400dd3d..9e18e158c 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -43,18 +43,28 @@ pub const CURRENT_QUERY_CACHE_VERSION: &str = "v1"; // .cache.json #[derive(Default, Clone, serde::Deserialize, serde::Serialize, Debug, Hash, Eq, PartialEq)] pub struct CacheMetadata { - query: String, + pub query: String, pub start_time: String, pub end_time: String, } +impl CacheMetadata { + pub const fn new(query: String, start_time: String, end_time: String) -> Self { + Self { + query, + start_time, + end_time, + } + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct QueryCache { version: String, current_size: u64, /// Mapping between storage path and cache path. - files: Cache, + files: Cache, } impl QueryCache { @@ -66,7 +76,7 @@ impl QueryCache { } } - pub fn get_file(&mut self, key: String) -> Option { + pub fn get_file(&mut self, key: CacheMetadata) -> Option { self.files.get(&key).cloned() } @@ -245,7 +255,7 @@ impl QueryCacheManager { pub async fn move_to_cache( &self, stream: &str, - key: String, + key: CacheMetadata, file_path: &Path, user_id: &str, ) -> Result<(), CacheError> { @@ -309,7 +319,7 @@ impl QueryCacheManager { arrow_writer.close().await?; self.move_to_cache( table_name, - format!("{}-{}-{}", start, end, query), + CacheMetadata::new(query, start, end), &parquet_path, user_id, ) From 2d59640068adcc3f9db0cf77916cd87ff5c337d1 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 21 May 2024 16:57:27 +0530 Subject: [PATCH 18/24] add cache endpoints --- server/src/handlers/http.rs | 2 +- server/src/handlers/http/ingest.rs | 4 +++ .../src/handlers/http/modal/query_server.rs | 1 + server/src/handlers/http/modal/server.rs | 14 +++++++++ server/src/localcache.rs | 2 ++ server/src/querycache.rs | 29 +++++++++++++++++++ server/src/rbac/role.rs | 4 +++ 7 files changed, 55 insertions(+), 1 deletion(-) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 200ba885e..211d7d17d 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -26,6 +26,7 @@ use crate::option::CONFIG; use self::{cluster::get_ingestor_info, query::Query}; pub(crate) mod about; +mod cache; pub mod cluster; pub(crate) mod health_check; pub(crate) mod ingest; @@ -40,7 +41,6 @@ pub(crate) mod query; pub(crate) mod rbac; pub(crate) mod role; pub mod users; - pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 25750ea8f..8b1db7a5d 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -30,6 +30,7 @@ use crate::handlers::{ LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, }; +use crate::localcache::CacheError; use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorageError}; @@ -445,6 +446,8 @@ pub enum PostError { FiltersError(#[from] FiltersError), #[error("Error: {0}")] DashboardError(#[from] DashboardError), + #[error("Error: {0}")] + CacheError(#[from] CacheError), } impl actix_web::ResponseError for PostError { @@ -465,6 +468,7 @@ impl actix_web::ResponseError for PostError { PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index a1ba35fa6..7d0fbc08d 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -121,6 +121,7 @@ impl QueryServer { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(Server::get_query_factory()) + .service(Server::get_cache_webscope()) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index c85bd1323..ad14750b4 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -21,6 +21,7 @@ use crate::banner; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; +use crate::handlers::http::cache; use crate::handlers::http::health_check; use crate::handlers::http::query; use crate::handlers::http::users::dashboards; @@ -137,6 +138,7 @@ impl Server { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(Self::get_query_factory()) + .service(Self::get_cache_webscope()) .service(Self::get_ingest_factory()) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) @@ -218,6 +220,18 @@ impl Server { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) } + pub fn get_cache_webscope() -> Scope { + web::scope("/cache").service( + web::scope("/{user_id}").service( + web::scope("/{stream}").service( + web::resource("") + .route(web::get().to(cache::list).authorize(Action::ListCache)) + .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), + ), + ), + ) + } + // get the logstream web scope pub fn get_logstream_webscope() -> Scope { web::scope("/logstream") diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 22b817646..c1e484362 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -261,4 +261,6 @@ pub enum CacheError { ParquetError(#[from] ParquetError), #[error("{0}")] MetadataError(#[from] MetadataError), + #[error("Error: Cache File Does Not Exist")] + DoesNotExist, } diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 9e18e158c..24b78c5d5 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -80,6 +80,18 @@ impl QueryCache { self.files.get(&key).cloned() } + pub fn current_size(&self) -> u64 { + self.current_size + } + + pub fn remove(&mut self, key: CacheMetadata) -> Option { + self.files.remove(&key) + } + + pub fn queries(&self) -> Vec<&CacheMetadata> { + self.files.keys().collect_vec() + } + // read the parquet // return the recordbatches pub async fn get_cached_records( @@ -238,6 +250,23 @@ impl QueryCacheManager { Ok(cache) } + pub async fn remove_from_cache( + &self, + key: CacheMetadata, + stream: &str, + user_id: &str, + ) -> Result<(), CacheError> { + let mut cache = self.get_cache(stream, user_id).await?; + + if let Some(remove_result) = cache.remove(key) { + self.put_cache(stream, &cache, user_id).await?; + tokio::spawn(fs::remove_file(remove_result)); + Ok(()) + } else { + Err(CacheError::DoesNotExist) + } + } + pub async fn put_cache( &self, stream: &str, diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 6253c4a8e..accbd471a 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -58,6 +58,8 @@ pub enum Action { GetFilter, CreateFilter, DeleteFilter, + ListCache, + RemoveCache, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -123,6 +125,8 @@ impl RoleBuilder { | Action::ListFilter | Action::CreateFilter | Action::DeleteFilter + | Action::ListCache + | Action::RemoveCache | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema From fd6422d7d5fdaae0391b32b15be7f64a3ae4f5f2 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 22 May 2024 07:36:45 +0530 Subject: [PATCH 19/24] update .gitignore to exclude directory called cache .gitignore was excluding anything that matched the pattern `cache*` causing the cache module to not be included --- .gitignore | 2 +- server/src/handlers/http/cache.rs | 95 +++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 server/src/handlers/http/cache.rs diff --git a/.gitignore b/.gitignore index bf69b23aa..5bbc1b194 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,5 @@ env-file parseable parseable_* parseable-env-secret -cache* +cache diff --git a/server/src/handlers/http/cache.rs b/server/src/handlers/http/cache.rs new file mode 100644 index 000000000..746746d25 --- /dev/null +++ b/server/src/handlers/http/cache.rs @@ -0,0 +1,95 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use anyhow::anyhow; +use bytes::Bytes; +use http::StatusCode; +use serde_json::json; + +use crate::{ + option::CONFIG, + querycache::{CacheMetadata, QueryCacheManager}, +}; + +use super::ingest::PostError; + +pub async fn list(req: HttpRequest) -> Result { + let stream = req + .match_info() + .get("stream") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?; + + let user_id = req + .match_info() + .get("user_id") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await + .unwrap_or(None); + + if let Some(query_cache_manager) = query_cache_manager { + let cache = query_cache_manager + .get_cache(stream, user_id) + .await + .map_err(PostError::CacheError)?; + + let size = cache.current_size(); + let queries = cache.queries(); + + let out = json!({ + "current_cache_size": size, + "cache": queries + }); + + Ok((web::Json(out), StatusCode::OK)) + } else { + Err(PostError::Invalid(anyhow!( + "Query Caching is not active on server " + ))) + } +} + +pub async fn remove(req: HttpRequest, body: Bytes) -> Result { + let stream = req + .match_info() + .get("stream") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?; + + let user_id = req + .match_info() + .get("user_id") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; + + let query = serde_json::from_slice::(&body)?; + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await + .unwrap_or(None); + + if let Some(query_cache_manager) = query_cache_manager { + query_cache_manager + .remove_from_cache(query, stream, user_id) + .await?; + + Ok(HttpResponse::Ok().finish()) + } else { + Err(PostError::Invalid(anyhow!("Query Caching is not enabled"))) + } +} From 77a72417f12bebd67dd5429d3db8e7d13d8629d6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 23 May 2024 12:13:47 +0530 Subject: [PATCH 20/24] fix bug if user id is not provided and query caching is enabled Request hung up if the user_id was not provided in the header, of the query request and query caching was enabled. --- server/src/handlers/airplane.rs | 6 ++++-- server/src/handlers/http/query.rs | 18 +++++++++++++----- server/src/localcache.rs | 2 ++ server/src/querycache.rs | 2 +- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 8ab7270f5..36b54d435 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -242,7 +242,7 @@ impl FlightService for AirServiceImpl { .await .map_err(|err| Status::internal(err.to_string()))?; - put_results_in_cache( + if let Err(err) = put_results_in_cache( cache_results, user_id, query_cache_manager, @@ -252,7 +252,9 @@ impl FlightService for AirServiceImpl { query.end.to_rfc3339(), ticket.query, ) - .await; + .await { + log::error!("{}", err); + }; /* * INFO: No returning the schema with the data. diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index be46788e1..1ea4b3404 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -131,7 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result<(), QueryError> { match (cache_results, query_cache_manager) { (Some(_), None) => { log::warn!( "Instructed to cache query results but Query Caching is not Enabled in Server" ); + + Ok(()) } // do cache (Some(_), Some(query_cache_manager)) => { - let user_id = user_id.expect("User Id was provided"); + let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; if let Err(err) = query_cache_manager .create_parquet_cache(stream, records, user_id, start, end, query) @@ -209,8 +213,12 @@ pub async fn put_results_in_cache( log::error!("Error Clearing Unwanted files from cache dir"); } } + // fallthrough + Ok(()) + } + (None, _) => { + Ok(()) } - (None, _) => {} } } diff --git a/server/src/localcache.rs b/server/src/localcache.rs index c1e484362..07ee3eaaa 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -263,4 +263,6 @@ pub enum CacheError { MetadataError(#[from] MetadataError), #[error("Error: Cache File Does Not Exist")] DoesNotExist, + #[error("Error: {0}")] + Other(&'static str), } diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 24b78c5d5..171e6a912 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -151,7 +151,7 @@ impl QueryCacheManager { &format!( "{}.{}.parquet", hostname_unchecked(), - Utc::now().to_rfc3339() + Utc::now().timestamp().to_string() ), ]) } From 341ee6ed2e86ea4ff0c569224d379b5ee655d833 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 23 May 2024 12:23:42 +0530 Subject: [PATCH 21/24] add value check for the show_cached and cache_results headers --- server/src/handlers/http/query.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 1ea4b3404..2d136b628 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -197,7 +197,13 @@ pub async fn put_results_in_cache( Ok(()) } // do cache - (Some(_), Some(query_cache_manager)) => { + (Some(should_cache), Some(query_cache_manager)) => { + + if should_cache != "true" { + log::error!("value of cache results header is false"); + return Err(QueryError::CacheError(CacheError::Other("should not cache results"))); + } + let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; if let Err(err) = query_cache_manager @@ -241,7 +247,12 @@ pub async fn get_results_from_cache( ); None } - (Some(_), Some(query_cache_manager)) => { + (Some(should_show), Some(query_cache_manager)) => { + if should_show != "true" { + log::error!("value of show cached header is false"); + return Err(QueryError::CacheError(CacheError::Other("should not return cached results"))); + } + let user_id = user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?; From a88356c2186eb0580eac42d7d48370249831da36 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 23 May 2024 13:32:57 +0530 Subject: [PATCH 22/24] clean up for better readablility --- server/src/handlers/airplane.rs | 3 ++- server/src/handlers/http/cache.rs | 4 ++-- server/src/handlers/http/ingest.rs | 3 --- server/src/handlers/http/modal/query_server.rs | 2 +- server/src/handlers/http/query.rs | 16 +++++++++------- server/src/querycache.rs | 11 +++++------ 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 36b54d435..d6b814615 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -252,7 +252,8 @@ impl FlightService for AirServiceImpl { query.end.to_rfc3339(), ticket.query, ) - .await { + .await + { log::error!("{}", err); }; diff --git a/server/src/handlers/http/cache.rs b/server/src/handlers/http/cache.rs index 746746d25..29efd09e4 100644 --- a/server/src/handlers/http/cache.rs +++ b/server/src/handlers/http/cache.rs @@ -50,11 +50,11 @@ pub async fn list(req: HttpRequest) -> Result { .await .map_err(PostError::CacheError)?; - let size = cache.current_size(); + let size = cache.used_cache_size(); let queries = cache.queries(); let out = json!({ - "current_cache_size": size, + "used_capacity": size, "cache": queries }); diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 8b1db7a5d..2cb2ffe13 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -433,8 +433,6 @@ pub enum PostError { Invalid(#[from] anyhow::Error), #[error("{0}")] CreateStream(#[from] CreateStreamError), - #[error("Error: {0}")] - Error(std::io::Error), #[allow(unused)] #[error("Error: {0}")] CustomError(String), @@ -467,7 +465,6 @@ impl actix_web::ResponseError for PostError { PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 7d0fbc08d..e13cd4f14 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -22,9 +22,9 @@ use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::rbac::role::Action; +use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; -use crate::sync; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::web; use actix_web::web::ServiceConfig; diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 2d136b628..b4793e5c3 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -141,7 +141,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result { - if should_cache != "true" { log::error!("value of cache results header is false"); - return Err(QueryError::CacheError(CacheError::Other("should not cache results"))); + return Err(QueryError::CacheError(CacheError::Other( + "should not cache results", + ))); } let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; @@ -222,9 +224,7 @@ pub async fn put_results_in_cache( // fallthrough Ok(()) } - (None, _) => { - Ok(()) - } + (None, _) => Ok(()), } } @@ -250,7 +250,9 @@ pub async fn get_results_from_cache( (Some(should_show), Some(query_cache_manager)) => { if should_show != "true" { log::error!("value of show cached header is false"); - return Err(QueryError::CacheError(CacheError::Other("should not return cached results"))); + return Err(QueryError::CacheError(CacheError::Other( + "should not return cached results", + ))); } let user_id = diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 171e6a912..3c1c9483e 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -40,7 +40,6 @@ pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; pub const QUERY_CACHE_META_FILENAME: &str = ".cache_meta.json"; pub const CURRENT_QUERY_CACHE_VERSION: &str = "v1"; -// .cache.json #[derive(Default, Clone, serde::Deserialize, serde::Serialize, Debug, Hash, Eq, PartialEq)] pub struct CacheMetadata { pub query: String, @@ -80,7 +79,7 @@ impl QueryCache { self.files.get(&key).cloned() } - pub fn current_size(&self) -> u64 { + pub fn used_cache_size(&self) -> u64 { self.current_size } @@ -137,7 +136,7 @@ impl QueryCacheMeta { pub struct QueryCacheManager { filesystem: LocalFileSystem, cache_path: PathBuf, // refers to the path passed in the env var - cache_capacity: u64, + total_cache_capacity: u64, semaphore: Mutex<()>, } @@ -151,7 +150,7 @@ impl QueryCacheManager { &format!( "{}.{}.parquet", hostname_unchecked(), - Utc::now().timestamp().to_string() + Utc::now().timestamp() ), ]) } @@ -172,7 +171,7 @@ impl QueryCacheManager { Self { filesystem: LocalFileSystem::new(), cache_path, - cache_capacity: CONFIG.parseable.query_cache_size, + total_cache_capacity: CONFIG.parseable.query_cache_size, semaphore: Mutex::new(()), } }); @@ -292,7 +291,7 @@ impl QueryCacheManager { let file_size = std::fs::metadata(file_path)?.len(); let mut cache = self.get_cache(stream, user_id).await?; - while cache.current_size + file_size > self.cache_capacity { + while cache.current_size + file_size > self.total_cache_capacity { if let Some((_, file_for_removal)) = cache.files.pop_lru() { let lru_file_size = fs::metadata(&file_for_removal).await?.len(); cache.current_size = cache.current_size.saturating_sub(lru_file_size); From 54db5ef6fdd3eae1f61fb42fc2884017fd374e91 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 24 May 2024 11:52:25 +0530 Subject: [PATCH 23/24] fix: the issue with caching data --- server/src/querycache.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 3c1c9483e..304ba4bc2 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -332,9 +332,17 @@ impl QueryCacheManager { let time_partition = STREAM_INFO.get_time_partition(table_name)?; let props = parquet_writer_props(time_partition.clone(), 0, HashMap::new()).build(); + let sch = if let Some(record) = records.first() { + record.schema() + } else { + // the record batch is empty, do not cache and return early + return Ok(()); + }; + + let mut arrow_writer = AsyncArrowWriter::try_new( parquet_file, - STREAM_INFO.schema(table_name).expect("schema present"), + sch, Some(props), )?; From 17d07d6cee7cf07c12eb3edbd29ab9e7a1b06635 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 24 May 2024 12:23:13 +0530 Subject: [PATCH 24/24] fix: issue creating multiple cache files --- server/src/handlers/http/query.rs | 11 ++++++++++- server/src/querycache.rs | 24 +++++++++++++----------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index b4793e5c3..576f838fe 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -207,6 +207,15 @@ pub async fn put_results_in_cache( } let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; + let mut cache = query_cache_manager.get_cache(stream, user_id).await?; + + let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone()); + + // guard to stop multiple caching of the same content + if let Some(path) = cache.get_file(&cache_key) { + log::info!("File already exists in cache, Removing old file"); + cache.delete(&cache_key, path).await?; + } if let Err(err) = query_cache_manager .create_parquet_cache(stream, records, user_id, start, end, query) @@ -262,7 +271,7 @@ pub async fn get_results_from_cache( let (start, end) = parse_human_time(start_time, end_time)?; - let file_path = query_cache.get_file(CacheMetadata::new( + let file_path = query_cache.get_file(&CacheMetadata::new( query.to_string(), start.to_rfc3339(), end.to_rfc3339(), diff --git a/server/src/querycache.rs b/server/src/querycache.rs index 304ba4bc2..4086de7b8 100644 --- a/server/src/querycache.rs +++ b/server/src/querycache.rs @@ -75,16 +75,23 @@ impl QueryCache { } } - pub fn get_file(&mut self, key: CacheMetadata) -> Option { - self.files.get(&key).cloned() + pub fn get_file(&mut self, key: &CacheMetadata) -> Option { + self.files.get(key).cloned() } pub fn used_cache_size(&self) -> u64 { self.current_size } - pub fn remove(&mut self, key: CacheMetadata) -> Option { - self.files.remove(&key) + pub fn remove(&mut self, key: &CacheMetadata) -> Option { + self.files.remove(key) + } + + pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> { + self.files.delete(key); + AsyncFs::remove_file(path).await?; + + Ok(()) } pub fn queries(&self) -> Vec<&CacheMetadata> { @@ -257,7 +264,7 @@ impl QueryCacheManager { ) -> Result<(), CacheError> { let mut cache = self.get_cache(stream, user_id).await?; - if let Some(remove_result) = cache.remove(key) { + if let Some(remove_result) = cache.remove(&key) { self.put_cache(stream, &cache, user_id).await?; tokio::spawn(fs::remove_file(remove_result)); Ok(()) @@ -339,12 +346,7 @@ impl QueryCacheManager { return Ok(()); }; - - let mut arrow_writer = AsyncArrowWriter::try_new( - parquet_file, - sch, - Some(props), - )?; + let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, Some(props))?; for record in records { if let Err(e) = arrow_writer.write(record).await {