diff --git a/.gitignore b/.gitignore index bf69b23aa..5bbc1b194 100644 --- a/.gitignore +++ b/.gitignore @@ -13,5 +13,5 @@ env-file parseable parseable_* parseable-env-secret -cache* +cache diff --git a/server/Cargo.toml b/server/Cargo.toml index 5d9eb69e2..9ca5b794b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -9,13 +9,14 @@ build = "build.rs" [dependencies] ### apache arrow/datafusion dependencies +# arrow = "51.0.0" arrow-schema = { version = "51.0.0", features = ["serde"] } arrow-array = { version = "51.0.0" } arrow-json = "51.0.0" arrow-ipc = { version = "51.0.0", features = ["zstd"] } arrow-select = "51.0.0" datafusion = "37.1.0" -object_store = { version = "0.9.1", features = ["cloud", "aws"] } +object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up parquet = "51.0.0" arrow-flight = { version = "51.0.0", features = [ "tls" ] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } @@ -72,11 +73,11 @@ relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default_features = false, features = [ "rustls-tls", "json", -] } -rustls = "0.22.4" +] } # cannot update cause rustls is not latest `see rustls` +rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet rustls-pemfile = "2.1.2" semver = "1.0" -serde = { version = "1.0", features = ["rc"] } +serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1.0" static-files = "0.2" sysinfo = "0.30.11" diff --git a/server/src/catalog.rs b/server/src/catalog.rs index c88d4c9a5..304752ccf 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -230,8 +230,7 @@ async fn create_manifest( .ok_or(IOError::new( ErrorKind::Other, "Failed to create upper bound for manifest", - )) - .map_err(ObjectStorageError::IoError)?, + ))?, ) .and_utc(); diff --git a/server/src/cli.rs b/server/src/cli.rs index cd3f8cf7a..961274035 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -92,6 +92,12 @@ pub struct Cli { /// port use by airplane(flight query service) pub flight_port: u16, + + /// to query cached data + pub query_cache_path: Option, + + /// Size for local cache + pub query_cache_size: u64, } impl Cli { @@ -102,6 +108,8 @@ impl Cli { pub const DOMAIN_URI: &'static str = "origin"; pub const STAGING: &'static str = "local-staging-path"; pub const CACHE: &'static str = "cache-path"; + pub const QUERY_CACHE: &'static str = "query-cache-path"; + pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size"; pub const CACHE_SIZE: &'static str = "cache-size"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; @@ -191,6 +199,25 @@ impl Cli { .next_line_help(true), ) + .arg( + Arg::new(Self::QUERY_CACHE) + .long(Self::QUERY_CACHE) + .env("P_QUERY_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::QUERY_CACHE_SIZE) + .long(Self::QUERY_CACHE_SIZE) + .env("P_QUERY_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) .arg( Arg::new(Self::USERNAME) .long(Self::USERNAME) @@ -372,6 +399,7 @@ impl FromArgMatches for Cli { fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { self.local_cache_path = m.get_one::(Self::CACHE).cloned(); + self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.domain_address = m.get_one::(Self::DOMAIN_URI).cloned(); @@ -394,6 +422,10 @@ impl FromArgMatches for Cli { .get_one::(Self::CACHE_SIZE) .cloned() .expect("default value for cache size"); + self.query_cache_size = m + .get_one(Self::QUERY_CACHE_SIZE) + .cloned() + .expect("default value for query cache size"); self.username = m .get_one::(Self::USERNAME) .cloned() diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 488f8da8a..0b990421d 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -49,10 +49,7 @@ impl FileWriter { ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { - writer - .writer - .write(record) - .map_err(StreamWriterError::Writer)?; + writer.writer.write(record)?; } // entry is not present thus we create it None => { @@ -100,8 +97,6 @@ fn init_new_stream_writer_file( let mut stream_writer = StreamWriter::try_new(file, &record.schema()) .expect("File and RecordBatch both are checked"); - stream_writer - .write(record) - .map_err(StreamWriterError::Writer)?; + stream_writer.write(record)?; Ok((path, stream_writer)) } diff --git a/server/src/handlers.rs b/server/src/handlers.rs index d0c11690f..5d173e1b4 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -23,6 +23,9 @@ pub mod livetail; const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; +const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results"; +const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached"; +const USER_ID_HEADER_KEY: &str = "x-p-user-id"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 8fd83f35d..d6b814615 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,5 +1,22 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use arrow_array::RecordBatch; -use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::ArrowError; @@ -7,7 +24,6 @@ use arrow_schema::ArrowError; use datafusion::common::tree_node::TreeNode; use serde_json::json; use std::net::SocketAddr; -use std::sync::Arc; use std::time::Instant; use tonic::codec::CompressionEncoding; @@ -16,20 +32,22 @@ use futures_util::{Future, TryFutureExt}; use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; -use crate::event::commit_schema; use crate::handlers::http::cluster::get_ingestor_info; -use crate::handlers::http::fetch_schema; +use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; use crate::metrics::QUERY_EXECUTE_TIME; -use crate::option::{Mode, CONFIG}; +use crate::option::CONFIG; use crate::handlers::livetail::cross_origin_config; -use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query}; +use crate::handlers::http::query::{ + authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed, +}; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::storage::object_storage::commit_schema_to_storage; +use crate::querycache::QueryCacheManager; use crate::utils::arrow::flight::{ - append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester, + append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, + send_to_ingester, }; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, @@ -37,13 +55,15 @@ use arrow_flight::{ SchemaResult, Ticket, }; use arrow_ipc::writer::IpcWriteOptions; -use futures::{stream, TryStreamExt}; +use futures::stream; use tonic::{Request, Response, Status, Streaming}; use crate::handlers::livetail::extract_session_key; use crate::metadata::STREAM_INFO; use crate::rbac::Users; +use super::http::query::get_results_from_cache; + #[derive(Clone, Debug)] pub struct AirServiceImpl {} @@ -112,7 +132,7 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = get_query_from_ticket(req)?; + let ticket = get_query_from_ticket(&req)?; log::info!("query requested to airplane: {:?}", ticket); @@ -132,32 +152,57 @@ impl FlightService for AirServiceImpl { let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); - let tables = visitor.into_inner(); - - if CONFIG.parseable.mode == Mode::Query { - // using http to get the schema. may update to use flight later - for table in tables { - if let Ok(new_schema) = fetch_schema(&table).await { - // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(&table, new_schema.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; - commit_schema(&table, Arc::new(new_schema)) - .map_err(|err| Status::internal(err.to_string()))?; - } - } + let streams = visitor.into_inner(); + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await + .unwrap_or(None); + + let cache_results = req + .metadata() + .get(CACHE_RESULTS_HEADER_KEY) + .and_then(|value| value.to_str().ok()); // I dont think we need to own this. + + let show_cached = req + .metadata() + .get(CACHE_VIEW_HEADER_KEY) + .and_then(|value| value.to_str().ok()); + + let user_id = req + .metadata() + .get(USER_ID_HEADER_KEY) + .and_then(|value| value.to_str().ok()); + let stream_name = streams + .first() + .ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))? + .to_owned(); + + // send the cached results + if let Ok(cache_results) = get_results_from_cache( + show_cached, + query_cache_manager, + &stream_name, + user_id, + &ticket.start_time, + &ticket.end_time, + &ticket.query, + ticket.send_null, + ticket.fields, + ) + .await + { + return cache_results.into_flight(); } + update_schema_when_distributed(streams) + .await + .map_err(|err| Status::internal(err.to_string()))?; + // map payload to query let mut query = into_query(&ticket, &session_state) .await .map_err(|_| Status::internal("Failed to parse query"))?; - // if table name is not present it is a Malformed Query - let stream_name = query - .first_table_name() - .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; - let event = if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { let sql = format!("select * from {}", &stream_name); @@ -192,11 +237,26 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; let time = Instant::now(); - let (results, _) = query + let (records, _) = query .execute(stream_name.clone()) .await .map_err(|err| Status::internal(err.to_string()))?; + if let Err(err) = put_results_in_cache( + cache_results, + user_id, + query_cache_manager, + &stream_name, + &records, + query.start.to_rfc3339(), + query.end.to_rfc3339(), + ticket.query, + ) + .await + { + log::error!("{}", err); + }; + /* * INFO: No returning the schema with the data. * kept it in case it needs to be sent in the future. @@ -208,18 +268,7 @@ impl FlightService for AirServiceImpl { .collect::>(); let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; */ - let input_stream = futures::stream::iter(results.into_iter().map(Ok)); - let write_options = IpcWriteOptions::default() - .try_with_compression(Some(arrow_ipc::CompressionType(1))) - .map_err(|err| Status::failed_precondition(err.to_string()))?; - - let flight_data_stream = FlightDataEncoderBuilder::new() - .with_max_flight_data_size(usize::MAX) - .with_options(write_options) - // .with_schema(schema.into()) - .build(input_stream); - - let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + let out = into_flight_data(records); if let Some(event) = event { event.clear(&stream_name); @@ -230,9 +279,7 @@ impl FlightService for AirServiceImpl { .with_label_values(&[&format!("flight-query-{}", stream_name)]) .observe(time); - Ok(Response::new( - Box::pin(flight_data_stream) as Self::DoGetStream - )) + out } async fn do_put( diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 6044b74ac..211d7d17d 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -26,6 +26,7 @@ use crate::option::CONFIG; use self::{cluster::get_ingestor_info, query::Query}; pub(crate) mod about; +mod cache; pub mod cluster; pub(crate) mod health_check; pub(crate) mod ingest; @@ -39,7 +40,7 @@ mod otel; pub(crate) mod query; pub(crate) mod rbac; pub(crate) mod role; - +pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; pub const API_VERSION: &str = "v1"; diff --git a/server/src/handlers/http/cache.rs b/server/src/handlers/http/cache.rs new file mode 100644 index 000000000..29efd09e4 --- /dev/null +++ b/server/src/handlers/http/cache.rs @@ -0,0 +1,95 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use anyhow::anyhow; +use bytes::Bytes; +use http::StatusCode; +use serde_json::json; + +use crate::{ + option::CONFIG, + querycache::{CacheMetadata, QueryCacheManager}, +}; + +use super::ingest::PostError; + +pub async fn list(req: HttpRequest) -> Result { + let stream = req + .match_info() + .get("stream") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?; + + let user_id = req + .match_info() + .get("user_id") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await + .unwrap_or(None); + + if let Some(query_cache_manager) = query_cache_manager { + let cache = query_cache_manager + .get_cache(stream, user_id) + .await + .map_err(PostError::CacheError)?; + + let size = cache.used_cache_size(); + let queries = cache.queries(); + + let out = json!({ + "used_capacity": size, + "cache": queries + }); + + Ok((web::Json(out), StatusCode::OK)) + } else { + Err(PostError::Invalid(anyhow!( + "Query Caching is not active on server " + ))) + } +} + +pub async fn remove(req: HttpRequest, body: Bytes) -> Result { + let stream = req + .match_info() + .get("stream") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid Stream Name in resource path")))?; + + let user_id = req + .match_info() + .get("user_id") + .ok_or_else(|| PostError::Invalid(anyhow!("Invalid User ID not in Resource path")))?; + + let query = serde_json::from_slice::(&body)?; + + let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size) + .await + .unwrap_or(None); + + if let Some(query_cache_manager) = query_cache_manager { + query_cache_manager + .remove_from_cache(query, stream, user_id) + .await?; + + Ok(HttpResponse::Ok().finish()) + } else { + Err(PostError::Invalid(anyhow!("Query Caching is not enabled"))) + } +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index a5157f34c..2cb2ffe13 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -18,6 +18,8 @@ use super::cluster::INTERNAL_STREAM_NAME; use super::logstream::error::CreateStreamError; +use super::users::dashboards::DashboardError; +use super::users::filters::FiltersError; use super::{kinesis, otel}; use crate::event::{ self, @@ -28,6 +30,7 @@ use crate::handlers::{ LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY, }; +use crate::localcache::CacheError; use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorageError}; @@ -168,7 +171,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let object_store_format = glob_storage .get_object_store_format(&stream_name) .await - .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + .map_err(|_| PostError::StreamNotFound(stream_name.clone()))?; let time_partition = object_store_format.time_partition; let time_partition_limit = object_store_format.time_partition_limit; @@ -430,12 +433,19 @@ pub enum PostError { Invalid(#[from] anyhow::Error), #[error("{0}")] CreateStream(#[from] CreateStreamError), + #[allow(unused)] #[error("Error: {0}")] CustomError(String), #[error("Error: {0}")] NetworkError(#[from] reqwest::Error), #[error("ObjectStorageError: {0}")] ObjectStorageError(#[from] ObjectStorageError), + #[error("Error: {0}")] + FiltersError(#[from] FiltersError), + #[error("Error: {0}")] + DashboardError(#[from] DashboardError), + #[error("Error: {0}")] + CacheError(#[from] CacheError), } impl actix_web::ResponseError for PostError { @@ -453,6 +463,9 @@ impl actix_web::ResponseError for PostError { PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 0abfc7ff0..e13cd4f14 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -23,6 +23,8 @@ use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_V use crate::rbac::role::Action; use crate::sync; +use crate::users::dashboards::DASHBOARDS; +use crate::users::filters::FILTERS; use crate::{analytics, banner, metadata, metrics, migration, rbac, storage}; use actix_web::web; use actix_web::web::ServiceConfig; @@ -119,11 +121,14 @@ impl QueryServer { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(Server::get_query_factory()) + .service(Server::get_cache_webscope()) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) .service(Server::get_logstream_webscope()) .service(Server::get_user_webscope()) + .service(Server::get_dashboards_webscope()) + .service(Server::get_filters_webscope()) .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Server::get_user_role_webscope()) @@ -174,6 +179,9 @@ impl QueryServer { log::warn!("could not populate local metadata. {:?}", e); } + FILTERS.load().await?; + DASHBOARDS.load().await?; + // load data from stats back to prometheus metrics metrics::fetch_stats_from_storage().await; metrics::reset_daily_metric_from_global(); diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 7b89dd9b9..ad14750b4 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -21,8 +21,11 @@ use crate::banner; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; +use crate::handlers::http::cache; use crate::handlers::http::health_check; use crate::handlers::http::query; +use crate::handlers::http::users::dashboards; +use crate::handlers::http::users::filters; use crate::handlers::http::API_BASE_PATH; use crate::handlers::http::API_VERSION; use crate::localcache::LocalCacheManager; @@ -32,6 +35,8 @@ use crate::migration; use crate::rbac; use crate::storage; use crate::sync; +use crate::users::dashboards::DASHBOARDS; +use crate::users::filters::FILTERS; use std::sync::Arc; use actix_web::web::resource; @@ -133,12 +138,15 @@ impl Server { web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body .service(Self::get_query_factory()) + .service(Self::get_cache_webscope()) .service(Self::get_ingest_factory()) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) .service(Self::get_about_factory()) .service(Self::get_logstream_webscope()) .service(Self::get_user_webscope()) + .service(Self::get_dashboards_webscope()) + .service(Self::get_filters_webscope()) .service(Self::get_llm_webscope()) .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()), @@ -146,11 +154,84 @@ impl Server { .service(Self::get_generated()); } + // get the dashboards web scope + pub fn get_dashboards_webscope() -> Scope { + web::scope("/dashboards").service( + web::scope("/{user_id}") + .service( + web::resource("").route( + web::get() + .to(dashboards::list) + .authorize(Action::ListDashboard), + ), + ) + .service( + web::scope("/{dashboard_id}").service( + web::resource("") + .route( + web::get() + .to(dashboards::get) + .authorize(Action::GetDashboard), + ) + .route( + web::post() + .to(dashboards::post) + .authorize(Action::CreateDashboard), + ) + .route( + web::delete() + .to(dashboards::delete) + .authorize(Action::DeleteDashboard), + ), + ), + ), + ) + } + + // get the filters web scope + pub fn get_filters_webscope() -> Scope { + web::scope("/filters").service( + web::scope("/{user_id}") + .service( + web::resource("") + .route(web::get().to(filters::list).authorize(Action::ListFilter)), + ) + .service( + web::scope("/{filter_id}").service( + web::resource("") + .route(web::get().to(filters::get).authorize(Action::GetFilter)) + .route( + web::post() + .to(filters::post) + .authorize(Action::CreateFilter), + ) + .route( + web::delete() + .to(filters::delete) + .authorize(Action::DeleteFilter), + ), + ), + ), + ) + } + // get the query factory pub fn get_query_factory() -> Resource { web::resource("/query").route(web::post().to(query::query).authorize(Action::Query)) } + pub fn get_cache_webscope() -> Scope { + web::scope("/cache").service( + web::scope("/{user_id}").service( + web::scope("/{stream}").service( + web::resource("") + .route(web::get().to(cache::list).authorize(Action::ListCache)) + .route(web::post().to(cache::remove).authorize(Action::RemoveCache)), + ), + ), + ) + } + // get the logstream web scope pub fn get_logstream_webscope() -> Scope { web::scope("/logstream") @@ -411,6 +492,9 @@ impl Server { log::warn!("could not populate local metadata. {:?}", err); } + FILTERS.load().await?; + DASHBOARDS.load().await?; + metrics::fetch_stats_from_storage().await; metrics::reset_daily_metric_from_global(); storage::retention::load_retention_from_global(); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 534db8075..576f838fe 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; +use anyhow::anyhow; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -32,13 +33,17 @@ use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; +use arrow_array::RecordBatch; use crate::event::commit_schema; +use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; +use crate::localcache::CacheError; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::querycache::{CacheMetadata, QueryCacheManager}; use crate::rbac::role::{Action, Permission}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -72,33 +77,74 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result = Users.get_permissions(&creds); + let creds = extract_session_key_from_req(&req)?; + let permissions = Users.get_permissions(&creds); let table_name = query .first_table_name() - .ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?; + .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + authorize_and_set_filter_tags(&mut query, permissions, &table_name)?; let time = Instant::now(); - let (records, fields) = query.execute(table_name.clone()).await?; + // deal with cache saving + if let Err(err) = put_results_in_cache( + cache_results, + user_id, + query_cache_manager, + &table_name, + &records, + query.start.to_rfc3339(), + query.end.to_rfc3339(), + query_request.query, + ) + .await + { + log::error!("{}", err); + }; let response = QueryResponse { records, @@ -117,6 +163,138 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result) -> Result<(), QueryError> { + if CONFIG.parseable.mode == Mode::Query { + for table in tables { + if let Ok(new_schema) = fetch_schema(&table).await { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&table, new_schema.clone()).await?; + + commit_schema(&table, Arc::new(new_schema))?; + } + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn put_results_in_cache( + cache_results: Option<&str>, + user_id: Option<&str>, + query_cache_manager: Option<&QueryCacheManager>, + stream: &str, + records: &[RecordBatch], + start: String, + end: String, + query: String, +) -> Result<(), QueryError> { + match (cache_results, query_cache_manager) { + (Some(_), None) => { + log::warn!( + "Instructed to cache query results but Query Caching is not Enabled in Server" + ); + + Ok(()) + } + // do cache + (Some(should_cache), Some(query_cache_manager)) => { + if should_cache != "true" { + log::error!("value of cache results header is false"); + return Err(QueryError::CacheError(CacheError::Other( + "should not cache results", + ))); + } + + let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?; + let mut cache = query_cache_manager.get_cache(stream, user_id).await?; + + let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone()); + + // guard to stop multiple caching of the same content + if let Some(path) = cache.get_file(&cache_key) { + log::info!("File already exists in cache, Removing old file"); + cache.delete(&cache_key, path).await?; + } + + if let Err(err) = query_cache_manager + .create_parquet_cache(stream, records, user_id, start, end, query) + .await + { + log::error!("Error occured while caching query results: {:?}", err); + if query_cache_manager + .clear_cache(stream, user_id) + .await + .is_err() + { + log::error!("Error Clearing Unwanted files from cache dir"); + } + } + // fallthrough + Ok(()) + } + (None, _) => Ok(()), + } +} + +#[allow(clippy::too_many_arguments)] +pub async fn get_results_from_cache( + show_cached: Option<&str>, + query_cache_manager: Option<&QueryCacheManager>, + stream: &str, + user_id: Option<&str>, + start_time: &str, + end_time: &str, + query: &str, + send_null: bool, + send_fields: bool, +) -> Result { + match (show_cached, query_cache_manager) { + (Some(_), None) => { + log::warn!( + "Instructed to show cached results but Query Caching is not Enabled on Server" + ); + None + } + (Some(should_show), Some(query_cache_manager)) => { + if should_show != "true" { + log::error!("value of show cached header is false"); + return Err(QueryError::CacheError(CacheError::Other( + "should not return cached results", + ))); + } + + let user_id = + user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?; + + let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; + + let (start, end) = parse_human_time(start_time, end_time)?; + + let file_path = query_cache.get_file(&CacheMetadata::new( + query.to_string(), + start.to_rfc3339(), + end.to_rfc3339(), + )); + if let Some(file_path) = file_path { + let (records, fields) = query_cache.get_cached_records(&file_path).await?; + let response = QueryResponse { + records, + fields, + fill_null: send_null, + with_fields: send_fields, + }; + + Some(Ok(response)) + } else { + None + } + } + (_, _) => None, + } + .map_or_else(|| Err(QueryError::CacheMiss), |ret_val| ret_val) +} + pub fn authorize_and_set_filter_tags( query: &mut LogicalQuery, permissions: Vec, @@ -200,20 +378,7 @@ pub async fn into_query( return Err(QueryError::EmptyEndTime); } - let start: DateTime; - let end: DateTime; - - if query.end_time == "now" { - end = Utc::now(); - start = end - chrono::Duration::from_std(humantime::parse_duration(&query.start_time)?)?; - } else { - start = DateTime::parse_from_rfc3339(&query.start_time) - .map_err(|_| QueryError::StartTimeParse)? - .into(); - end = DateTime::parse_from_rfc3339(&query.end_time) - .map_err(|_| QueryError::EndTimeParse)? - .into(); - }; + let (start, end) = parse_human_time(&query.start_time, &query.end_time)?; if start.timestamp() > end.timestamp() { return Err(QueryError::StartTimeAfterEndTime); @@ -227,6 +392,28 @@ pub async fn into_query( }) } +fn parse_human_time( + start_time: &str, + end_time: &str, +) -> Result<(DateTime, DateTime), QueryError> { + let start: DateTime; + let end: DateTime; + + if end_time == "now" { + end = Utc::now(); + start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; + } else { + start = DateTime::parse_from_rfc3339(start_time) + .map_err(|_| QueryError::StartTimeParse)? + .into(); + end = DateTime::parse_from_rfc3339(end_time) + .map_err(|_| QueryError::EndTimeParse)? + .into(); + }; + + Ok((start, end)) +} + /// unused for now, might need it in the future #[allow(unused)] fn transform_query_for_ingestor(query: &Query) -> Option { @@ -290,15 +477,24 @@ pub enum QueryError { Execute(#[from] ExecuteError), #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error("Cache Error: {0}")] + CacheError(#[from] CacheError), + #[error("")] + CacheMiss, #[error("Evern Error: {0}")] EventError(#[from] EventError), #[error("Error: {0}")] - MalformedQuery(String), + MalformedQuery(&'static str), + #[allow(unused)] #[error( r#"Error: Failed to Parse Record Batch into Json Description: {0}"# )] JsonParse(String), + #[error("Error: {0}")] + ActixError(#[from] actix_web::Error), + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), } impl actix_web::ResponseError for QueryError { diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs new file mode 100644 index 000000000..4f2bbc6d3 --- /dev/null +++ b/server/src/handlers/http/users/dashboards.rs @@ -0,0 +1,148 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + handlers::http::ingest::PostError, + option::CONFIG, + storage::{object_storage::dashboard_path, ObjectStorageError}, + users::dashboards::{Dashboard, DASHBOARDS}, +}; +use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; + +use http::StatusCode; +use serde_json::{Error as SerdeError, Value as JsonValue}; + +pub async fn list(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + // .users/user_id/dashboards/ + let path = dashboard_path(user_id, ""); + + let store = CONFIG.storage().get_object_store(); + let dashboards = store + .get_objects( + Some(&path), + Box::new(|file_name: String| file_name.ends_with("json")), + ) + .await?; + + let mut dash = vec![]; + for dashboard in dashboards { + dash.push(serde_json::from_slice::(&dashboard)?) + } + + Ok((web::Json(dash), StatusCode::OK)) +} + +pub async fn get(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + let dash_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + if let Some(dashboard) = DASHBOARDS.find(dash_id) { + return Ok((web::Json(dashboard), StatusCode::OK)); + } + + //if dashboard is not in memory fetch from s3 + let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); + let resource = CONFIG + .storage() + .get_object_store() + .get_object(&dash_file_path) + .await?; + let resource = serde_json::from_slice::(&resource)?; + + Ok((web::Json(resource), StatusCode::OK)) +} + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + let dash_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); + + let dashboard = serde_json::from_slice::(&body)?; + DASHBOARDS.update(dashboard); + + let store = CONFIG.storage().get_object_store(); + store.put_object(&dash_file_path, body).await?; + + Ok(HttpResponse::Ok().finish()) +} + +pub async fn delete(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(DashboardError::Metadata("No User Id Provided"))?; + + let dash_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id)); + + let store = CONFIG.storage().get_object_store(); + store.delete_object(&dash_file_path).await?; + + Ok(HttpResponse::Ok().finish()) +} + +#[derive(Debug, thiserror::Error)] +pub enum DashboardError { + #[error("Failed to connect to storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), +} + +impl actix_web::ResponseError for DashboardError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs new file mode 100644 index 000000000..49179667e --- /dev/null +++ b/server/src/handlers/http/users/filters.rs @@ -0,0 +1,182 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY}, + option::CONFIG, + storage::{object_storage::filter_path, ObjectStorageError}, + users::filters::{Filter, FILTERS}, +}; +use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; +use http::StatusCode; +use serde_json::{Error as SerdeError, Value as JsonValue}; + +pub async fn list(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + // .users/user_id/filters/stream_name/filter_id + let path = filter_path(user_id, stream_name, ""); + + let store = CONFIG.storage().get_object_store(); + let filters = store + .get_objects( + Some(&path), + Box::new(|file_name: String| file_name.ends_with("json")), + ) + .await?; + + let mut filt = vec![]; + for filter in filters { + filt.push(serde_json::from_slice::(&filter)?) + } + + Ok((web::Json(filt), StatusCode::OK)) +} + +pub async fn get(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let filt_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + if let Some(filter) = FILTERS.find(filt_id) { + return Ok((web::Json(filter), StatusCode::OK)); + } + + // if it is not in memory go to s3 + let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + let resource = CONFIG + .storage() + .get_object_store() + .get_object(&path) + .await?; + + let resource = serde_json::from_slice::(&resource)?; + + Ok((web::Json(resource), StatusCode::OK)) +} + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let filt_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + let filter: Filter = serde_json::from_slice(&body)?; + FILTERS.update(filter); + + let store = CONFIG.storage().get_object_store(); + store.put_object(&path, body).await?; + + Ok(HttpResponse::Ok().finish()) +} + +pub async fn delete(req: HttpRequest) -> Result { + let user_id = req + .match_info() + .get("user_id") + .ok_or(FiltersError::Metadata("No User Id Provided"))?; + + let filt_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + let stream_name = req + .headers() + .iter() + .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .ok_or_else(|| FiltersError::Metadata("Stream Name Not Provided"))? + .1 + .to_str() + .map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?; + + let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id)); + let store = CONFIG.storage().get_object_store(); + store.delete_object(&path).await?; + + Ok(HttpResponse::Ok().finish()) +} + +#[derive(Debug, thiserror::Error)] +pub enum FiltersError { + #[error("Failed to connect to storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), +} + +impl actix_web::ResponseError for FiltersError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/server/src/handlers/http/users/mod.rs b/server/src/handlers/http/users/mod.rs new file mode 100644 index 000000000..e4931aef6 --- /dev/null +++ b/server/src/handlers/http/users/mod.rs @@ -0,0 +1,24 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod dashboards; +pub mod filters; + +pub const USERS_ROOT_DIR: &str = ".users"; +pub const DASHBOARDS_DIR: &str = "dashboards"; +pub const FILTER_DIR: &str = "filters"; diff --git a/server/src/localcache.rs b/server/src/localcache.rs index 9ba0c15f8..07ee3eaaa 100644 --- a/server/src/localcache.rs +++ b/server/src/localcache.rs @@ -25,9 +25,10 @@ use human_size::{Byte, Gigibyte, SpecificSize}; use itertools::{Either, Itertools}; use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; +use parquet::errors::ParquetError; use tokio::{fs, sync::Mutex}; -use crate::option::CONFIG; +use crate::{metadata::error::stream_info::MetadataError, option::CONFIG}; pub const STREAM_CACHE_FILENAME: &str = ".cache.json"; pub const CACHE_META_FILENAME: &str = ".cache_meta.json"; @@ -256,4 +257,12 @@ pub enum CacheError { MoveError(#[from] fs_extra::error::Error), #[error("{0}")] ObjectStoreError(#[from] object_store::Error), + #[error("{0}")] + ParquetError(#[from] ParquetError), + #[error("{0}")] + MetadataError(#[from] MetadataError), + #[error("Error: Cache File Does Not Exist")] + DoesNotExist, + #[error("Error: {0}")] + Other(&'static str), } diff --git a/server/src/main.rs b/server/src/main.rs index 95cbcb919..e9d92abfc 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -32,12 +32,14 @@ mod migration; mod oidc; mod option; mod query; +mod querycache; mod rbac; mod response; mod static_schema; mod stats; mod storage; mod sync; +mod users; mod utils; mod validator; diff --git a/server/src/metrics/prom_utils.rs b/server/src/metrics/prom_utils.rs index cd01fbd76..2df0d85a0 100644 --- a/server/src/metrics/prom_utils.rs +++ b/server/src/metrics/prom_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::handlers::http::base_path_without_preceding_slash; use crate::handlers::http::ingest::PostError; use crate::handlers::http::modal::IngestorMetadata; diff --git a/server/src/migration.rs b/server/src/migration.rs index 2c543516e..d8c89b578 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -158,7 +158,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: } #[inline(always)] -fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { +pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { serde_json::to_vec(any) .map(|any| any.into()) .expect("serialize cannot fail") diff --git a/server/src/query.rs b/server/src/query.rs index ce80ba2e1..7239467c0 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -186,6 +186,9 @@ impl TableScanVisitor { pub fn into_inner(self) -> Vec { self.tables } + pub fn top(&self) -> Option<&str> { + self.tables.first().map(|s| s.as_ref()) + } } impl TreeNodeVisitor for TableScanVisitor { diff --git a/server/src/query/listing_table_builder.rs b/server/src/query/listing_table_builder.rs index ee5b8c0d1..278ed5783 100644 --- a/server/src/query/listing_table_builder.rs +++ b/server/src/query/listing_table_builder.rs @@ -167,7 +167,6 @@ impl ListingTableBuilder { }) .try_collect() .await - // TODO: make the err map better .map_err(|err| DataFusionError::External(Box::new(err)))?; let mut res = res.into_iter().flatten().collect_vec(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index dccb3aa84..7100ac95a 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -180,8 +180,8 @@ async fn collect_from_snapshot( .map(|item| item.manifest_path) .collect(), ) - .await - .map_err(DataFusionError::ObjectStore)?; + .await?; + let mut manifest_files: Vec<_> = manifest_files .into_iter() .flat_map(|file| file.files) diff --git a/server/src/querycache.rs b/server/src/querycache.rs new file mode 100644 index 000000000..4086de7b8 --- /dev/null +++ b/server/src/querycache.rs @@ -0,0 +1,409 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_array::RecordBatch; +use chrono::Utc; +use futures::TryStreamExt; +use futures_util::TryFutureExt; +use hashlru::Cache; +use human_size::{Byte, Gigibyte, SpecificSize}; +use itertools::Itertools; +use object_store::{local::LocalFileSystem, ObjectStore}; +use once_cell::sync::OnceCell; +use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use tokio::fs as AsyncFs; +use tokio::{fs, sync::Mutex}; + +use crate::handlers::http::users::USERS_ROOT_DIR; +use crate::metadata::STREAM_INFO; +use crate::storage::staging::parquet_writer_props; +use crate::{localcache::CacheError, option::CONFIG, utils::hostname_unchecked}; + +pub const QUERY_CACHE_FILENAME: &str = ".cache.json"; +pub const QUERY_CACHE_META_FILENAME: &str = ".cache_meta.json"; +pub const CURRENT_QUERY_CACHE_VERSION: &str = "v1"; + +#[derive(Default, Clone, serde::Deserialize, serde::Serialize, Debug, Hash, Eq, PartialEq)] +pub struct CacheMetadata { + pub query: String, + pub start_time: String, + pub end_time: String, +} + +impl CacheMetadata { + pub const fn new(query: String, start_time: String, end_time: String) -> Self { + Self { + query, + start_time, + end_time, + } + } +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct QueryCache { + version: String, + current_size: u64, + + /// Mapping between storage path and cache path. + files: Cache, +} + +impl QueryCache { + fn new() -> Self { + Self { + version: CURRENT_QUERY_CACHE_VERSION.to_string(), + current_size: 0, + files: Cache::new(100), + } + } + + pub fn get_file(&mut self, key: &CacheMetadata) -> Option { + self.files.get(key).cloned() + } + + pub fn used_cache_size(&self) -> u64 { + self.current_size + } + + pub fn remove(&mut self, key: &CacheMetadata) -> Option { + self.files.remove(key) + } + + pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> { + self.files.delete(key); + AsyncFs::remove_file(path).await?; + + Ok(()) + } + + pub fn queries(&self) -> Vec<&CacheMetadata> { + self.files.keys().collect_vec() + } + + // read the parquet + // return the recordbatches + pub async fn get_cached_records( + &self, + path: &PathBuf, + ) -> Result<(Vec, Vec), CacheError> { + let file = AsyncFs::File::open(path).await?; + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; + // Build a async parquet reader. + let stream = builder.build()?; + + let records = stream.try_collect::>().await?; + let fields = records.first().map_or_else(Vec::new, |record| { + record + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec() + }); + + Ok((records, fields)) + } +} + +// .cache_meta.json +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub struct QueryCacheMeta { + version: String, + size_capacity: u64, +} + +impl QueryCacheMeta { + fn new() -> Self { + Self { + version: CURRENT_QUERY_CACHE_VERSION.to_string(), + size_capacity: 0, + } + } +} + +pub struct QueryCacheManager { + filesystem: LocalFileSystem, + cache_path: PathBuf, // refers to the path passed in the env var + total_cache_capacity: u64, + semaphore: Mutex<()>, +} + +impl QueryCacheManager { + pub fn gen_file_path(query_staging_path: &str, stream: &str, user_id: &str) -> PathBuf { + PathBuf::from_iter([ + query_staging_path, + USERS_ROOT_DIR, + user_id, + stream, + &format!( + "{}.{}.parquet", + hostname_unchecked(), + Utc::now().timestamp() + ), + ]) + } + pub async fn global(config_capacity: u64) -> Result, CacheError> { + static INSTANCE: OnceCell = OnceCell::new(); + + let cache_path = CONFIG.parseable.query_cache_path.as_ref(); + + if cache_path.is_none() { + return Ok(None); + } + + let cache_path = cache_path.unwrap(); + + let cache_manager = INSTANCE.get_or_init(|| { + let cache_path = cache_path.clone(); + std::fs::create_dir_all(&cache_path).unwrap(); + Self { + filesystem: LocalFileSystem::new(), + cache_path, + total_cache_capacity: CONFIG.parseable.query_cache_size, + semaphore: Mutex::new(()), + } + }); + + cache_manager.validate(config_capacity).await?; + + Ok(Some(cache_manager)) + } + + async fn validate(&self, config_capacity: u64) -> Result<(), CacheError> { + fs::create_dir_all(&self.cache_path).await?; + let path = query_cache_meta_path(&self.cache_path) + .map_err(|err| CacheError::ObjectStoreError(err.into()))?; + let resp = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + + let updated_cache = match resp { + Ok(bytes) => { + let mut meta: QueryCacheMeta = serde_json::from_slice(&bytes)?; + if meta.size_capacity != config_capacity { + // log the change in cache size + let configured_size_human: SpecificSize = + SpecificSize::new(config_capacity as f64, Byte) + .unwrap() + .into(); + let current_size_human: SpecificSize = + SpecificSize::new(meta.size_capacity as f64, Byte) + .unwrap() + .into(); + log::warn!( + "Cache size is updated from {} to {}", + current_size_human, + configured_size_human + ); + meta.size_capacity = config_capacity; + Some(meta) + } else { + None + } + } + Err(object_store::Error::NotFound { .. }) => { + let mut meta = QueryCacheMeta::new(); + meta.size_capacity = config_capacity; + Some(meta) + } + Err(err) => return Err(err.into()), + }; + + if let Some(updated_cache) = updated_cache { + let result = self + .filesystem + .put(&path, serde_json::to_vec(&updated_cache)?.into()) + .await?; + log::info!("Cache meta file updated: {:?}", result); + } + + Ok(()) + } + + pub async fn get_cache(&self, stream: &str, user_id: &str) -> Result { + let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); + let res = self + .filesystem + .get(&path) + .and_then(|resp| resp.bytes()) + .await; + let cache = match res { + Ok(bytes) => serde_json::from_slice(&bytes)?, + Err(object_store::Error::NotFound { .. }) => QueryCache::new(), + Err(err) => return Err(err.into()), + }; + Ok(cache) + } + + pub async fn remove_from_cache( + &self, + key: CacheMetadata, + stream: &str, + user_id: &str, + ) -> Result<(), CacheError> { + let mut cache = self.get_cache(stream, user_id).await?; + + if let Some(remove_result) = cache.remove(&key) { + self.put_cache(stream, &cache, user_id).await?; + tokio::spawn(fs::remove_file(remove_result)); + Ok(()) + } else { + Err(CacheError::DoesNotExist) + } + } + + pub async fn put_cache( + &self, + stream: &str, + cache: &QueryCache, + user_id: &str, + ) -> Result<(), CacheError> { + let path = query_cache_file_path(&self.cache_path, stream, user_id).unwrap(); + + let bytes = serde_json::to_vec(cache)?.into(); + let result = self.filesystem.put(&path, bytes).await?; + log::info!("Cache file updated: {:?}", result); + Ok(()) + } + + pub async fn move_to_cache( + &self, + stream: &str, + key: CacheMetadata, + file_path: &Path, + user_id: &str, + ) -> Result<(), CacheError> { + let lock = self.semaphore.lock().await; + let file_size = std::fs::metadata(file_path)?.len(); + let mut cache = self.get_cache(stream, user_id).await?; + + while cache.current_size + file_size > self.total_cache_capacity { + if let Some((_, file_for_removal)) = cache.files.pop_lru() { + let lru_file_size = fs::metadata(&file_for_removal).await?.len(); + cache.current_size = cache.current_size.saturating_sub(lru_file_size); + log::info!("removing cache entry"); + tokio::spawn(fs::remove_file(file_for_removal)); + } else { + log::error!("Cache size too small"); + break; + } + } + + if cache.files.is_full() { + cache.files.resize(cache.files.capacity() * 2); + } + cache.files.push(key, file_path.to_path_buf()); + cache.current_size += file_size; + self.put_cache(stream, &cache, user_id).await?; + drop(lock); + Ok(()) + } + + pub async fn create_parquet_cache( + &self, + table_name: &str, + records: &[RecordBatch], + user_id: &str, + start: String, + end: String, + query: String, + ) -> Result<(), CacheError> { + let parquet_path = Self::gen_file_path( + self.cache_path.to_str().expect("utf-8 compat path"), + user_id, + table_name, + ); + AsyncFs::create_dir_all(parquet_path.parent().expect("parent path exists")).await?; + let parquet_file = AsyncFs::File::create(&parquet_path).await?; + let time_partition = STREAM_INFO.get_time_partition(table_name)?; + let props = parquet_writer_props(time_partition.clone(), 0, HashMap::new()).build(); + + let sch = if let Some(record) = records.first() { + record.schema() + } else { + // the record batch is empty, do not cache and return early + return Ok(()); + }; + + let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, Some(props))?; + + for record in records { + if let Err(e) = arrow_writer.write(record).await { + log::error!("Error While Writing to Query Cache: {}", e); + } + } + + arrow_writer.close().await?; + self.move_to_cache( + table_name, + CacheMetadata::new(query, start, end), + &parquet_path, + user_id, + ) + .await + } + + pub async fn clear_cache(&self, stream: &str, user_id: &str) -> Result<(), CacheError> { + let cache = self.get_cache(stream, user_id).await?; + let map = cache.files.values().collect_vec(); + let p_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); + let path = self.cache_path.join(p_path); + let mut paths = fs::read_dir(path).await?; + while let Some(path) = paths.next_entry().await? { + let check = path.path().is_file() + && map.contains(&&path.path()) + && !path + .path() + .file_name() + .expect("File Name is Proper") + .to_str() + .expect("Path is Proper utf-8 ") + .ends_with(".json"); + if check { + fs::remove_file(path.path()).await?; + } + } + + Ok(()) + } +} + +fn query_cache_file_path( + root: impl AsRef, + stream: &str, + user_id: &str, +) -> Result { + let local_meta_path = PathBuf::from_iter([USERS_ROOT_DIR, stream, user_id]); + let mut path = root.as_ref().join(local_meta_path); + + path.push(QUERY_CACHE_FILENAME); + object_store::path::Path::from_absolute_path(path) +} + +fn query_cache_meta_path( + root: impl AsRef, +) -> Result { + let path = root.as_ref().join(QUERY_CACHE_META_FILENAME); + object_store::path::Path::from_absolute_path(path) +} diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index ee17bea5c..accbd471a 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -50,6 +50,16 @@ pub enum Action { Deleteingestor, All, GetAnalytics, + ListDashboard, + GetDashboard, + CreateDashboard, + DeleteDashboard, + ListFilter, + GetFilter, + CreateFilter, + DeleteFilter, + ListCache, + RemoveCache, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -107,6 +117,16 @@ impl RoleBuilder { | Action::ListCluster | Action::ListClusterMetrics | Action::Deleteingestor + | Action::ListDashboard + | Action::GetDashboard + | Action::CreateDashboard + | Action::DeleteDashboard + | Action::GetFilter + | Action::ListFilter + | Action::CreateFilter + | Action::DeleteFilter + | Action::ListCache + | Action::RemoveCache | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema diff --git a/server/src/response.rs b/server/src/response.rs index 0f5deb5ec..e2abfa2d2 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -16,11 +16,18 @@ * */ -use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; +use crate::{ + handlers::http::query::QueryError, + utils::arrow::{ + flight::{into_flight_data, DoGetStream}, + record_batches_to_json, + }, +}; use actix_web::{web, Responder}; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; +use tonic::{Response, Status}; pub struct QueryResponse { pub records: Vec, @@ -33,8 +40,8 @@ impl QueryResponse { pub fn to_http(&self) -> Result { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records) - .map_err(|err| QueryError::JsonParse(err.to_string()))?; + let mut json_records = record_batches_to_json(&records)?; + if self.fill_null { for map in &mut json_records { for field in &self.fields { @@ -57,4 +64,8 @@ impl QueryResponse { Ok(web::Json(response)) } + + pub fn into_flight(self) -> Result, Status> { + into_flight_data(self.records) + } } diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index 488f88f88..5f9f60fcf 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::event::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY}; use crate::utils::arrow::get_field; use anyhow::{anyhow, Error as AnyError}; diff --git a/server/src/storage.rs b/server/src/storage.rs index cf25ad826..d28dcb00e 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,9 @@ * */ -use crate::{catalog::snapshot::Snapshot, stats::FullStats}; +use crate::{ + catalog::snapshot::Snapshot, metadata::error::stream_info::MetadataError, stats::FullStats, +}; use chrono::Local; @@ -209,6 +211,8 @@ pub enum ObjectStorageError { UnhandledError(Box), #[error("Error: {0}")] PathError(relative_path::FromPathError), + #[error("Error: {0}")] + MetadataError(#[from] MetadataError), #[allow(dead_code)] #[error("Authentication Error: {0}")] diff --git a/server/src/storage/metrics_layer.rs b/server/src/storage/metrics_layer.rs index 7fa257074..04c2f3346 100644 --- a/server/src/storage/metrics_layer.rs +++ b/server/src/storage/metrics_layer.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{ ops::Range, task::{Context, Poll}, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f5945b86d..5af74cd60 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,6 +26,7 @@ use super::{ }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY}; use crate::option::Mode; use crate::{ @@ -66,7 +67,7 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug { #[async_trait] pub trait ObjectStorage: Sync + 'static { async fn get_object(&self, path: &RelativePath) -> Result; - // want to make it more generic with a filter function + // TODO: make the filter function optional as we may want to get all objects async fn get_objects( &self, base_path: Option<&RelativePath>, @@ -603,6 +604,24 @@ pub fn stream_json_path(stream_name: &str) -> RelativePathBuf { } } +/// if dashboard_id is an empty str it should not append it to the rel path +#[inline(always)] +pub fn dashboard_path(user_id: &str, dashboard_file_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([USERS_ROOT_DIR, user_id, DASHBOARDS_DIR, dashboard_file_name]) +} + +/// if filter_id is an empty str it should not append it to the rel path +#[inline(always)] +pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) -> RelativePathBuf { + RelativePathBuf::from_iter([ + USERS_ROOT_DIR, + user_id, + FILTER_DIR, + stream_name, + filter_file_name, + ]) +} + /// path will be ".parseable/.parsable.json" #[inline(always)] pub fn parseable_json_path() -> RelativePathBuf { diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 430aa6b0d..00ac3c01c 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -38,6 +38,7 @@ use std::path::Path as StdPath; use std::sync::Arc; use std::time::{Duration, Instant}; +use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; @@ -305,6 +306,7 @@ impl S3 { .filter_map(|path| path.parts().next()) .map(|name| name.as_ref().to_string()) .filter(|x| x != PARSEABLE_ROOT_DIRECTORY) + .filter(|x| x != USERS_ROOT_DIR) .collect(); let stream_json_check = FuturesUnordered::new(); diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index b30c8689c..82c819082 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -293,7 +293,7 @@ pub fn convert_disk_files_to_parquet( } } -fn parquet_writer_props( +pub fn parquet_writer_props( time_partition: Option, index_time_partition: usize, custom_partition_fields: HashMap, diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 44ae55868..d3ecd4040 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -116,9 +116,7 @@ pub async fn resolve_parseable_metadata() -> Result Result { // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query - if Mode::from_string(&metadata.server_mode).map_err(ObjectStorageError::Custom)? == Mode::All && CONFIG.parseable.mode == Mode::Ingest { + if Mode::from_string(&metadata.server_mode) + .map_err(ObjectStorageError::Custom) + ? + == Mode::All && CONFIG.parseable.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { create_dir_all(CONFIG.staging_dir())?; diff --git a/server/src/users/dashboards.rs b/server/src/users/dashboards.rs new file mode 100644 index 000000000..561ec2507 --- /dev/null +++ b/server/src/users/dashboards.rs @@ -0,0 +1,96 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::sync::RwLock; + +use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; +use serde::{Deserialize, Serialize}; + +use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG}; + +use super::TimeFilter; + +pub static DASHBOARDS: Lazy = Lazy::new(Dashboards::default); + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct Pannel { + stream_name: String, + query: String, + chart_type: String, + columns: Vec, + headers: Vec, + dimensions: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct Dashboard { + version: String, + dashboard_name: String, + dashboard_id: String, + time_filter: TimeFilter, + refresh_interval: u64, + pannels: Vec, +} + +impl Dashboard { + pub fn dashboard_id(&self) -> &str { + &self.dashboard_id + } +} + +#[derive(Default, Debug)] +pub struct Dashboards(RwLock>); + +impl Dashboards { + pub async fn load(&self) -> anyhow::Result<()> { + let mut this = vec![]; + let path = RelativePathBuf::from(USERS_ROOT_DIR); + let store = CONFIG.storage().get_object_store(); + let objs = store + .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) + .await + .unwrap_or_default(); + + for obj in objs { + if let Ok(filter) = serde_json::from_slice::(&obj) { + this.push(filter); + } + } + + let mut s = self.0.write().expect(LOCK_EXPECT); + s.append(&mut this); + + Ok(()) + } + + pub fn update(&self, dashboard: Dashboard) { + let mut s = self.0.write().expect(LOCK_EXPECT); + + s.push(dashboard); + } + + pub fn find(&self, dashboard_id: &str) -> Option { + self.0 + .read() + .expect(LOCK_EXPECT) + .iter() + .find(|dashboard| dashboard.dashboard_id() == dashboard_id) + .cloned() + } +} diff --git a/server/src/users/filters.rs b/server/src/users/filters.rs new file mode 100644 index 000000000..87580a8c6 --- /dev/null +++ b/server/src/users/filters.rs @@ -0,0 +1,85 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use once_cell::sync::Lazy; +use relative_path::RelativePathBuf; +use serde::{Deserialize, Serialize}; +use std::sync::RwLock; + +use super::TimeFilter; +use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG}; + +pub static FILTERS: Lazy = Lazy::new(Filters::default); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Filter { + version: String, + stream_name: String, + filter_name: String, + filter_id: String, + query: String, + time_filter: Option, +} + +impl Filter { + pub fn filter_id(&self) -> &str { + &self.filter_id + } +} + +#[derive(Debug, Default)] +pub struct Filters(RwLock>); + +impl Filters { + pub async fn load(&self) -> anyhow::Result<()> { + let mut this = vec![]; + let path = RelativePathBuf::from(USERS_ROOT_DIR); + let store = CONFIG.storage().get_object_store(); + + let objs = store + .get_objects(Some(&path), Box::new(|path| path.ends_with(".json"))) + .await + .unwrap_or_default(); + + for obj in objs { + if let Ok(filter) = serde_json::from_slice::(&obj) { + this.push(filter); + } + } + + let mut s = self.0.write().expect(LOCK_EXPECT); + s.append(&mut this); + + Ok(()) + } + + pub fn update(&self, filter: Filter) { + let mut s = self.0.write().expect(LOCK_EXPECT); + + s.push(filter); + } + + pub fn find(&self, filter_id: &str) -> Option { + self.0 + .read() + .expect(LOCK_EXPECT) + .iter() + .find(|filter| filter.filter_id() == filter_id) + .cloned() + } +} diff --git a/server/src/users/mod.rs b/server/src/users/mod.rs new file mode 100644 index 000000000..7bfac6726 --- /dev/null +++ b/server/src/users/mod.rs @@ -0,0 +1,28 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +pub mod dashboards; +pub mod filters; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct TimeFilter { + to: String, + from: String, +} diff --git a/server/src/utils.rs b/server/src/utils.rs index 82f4f226e..84b604a51 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -280,7 +280,7 @@ pub fn get_ingestor_id() -> String { let result = format!("{:x}", hasher.finalize()); let result = result.split_at(15).0.to_string(); log::debug!("Ingestor ID: {}", &result); - result.to_string() + result } #[cfg(test)] diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 113407c9d..3b0b17eba 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; use crate::handlers::http::query::Query as QueryJson; @@ -9,21 +27,25 @@ use crate::{ }; use arrow_array::RecordBatch; -use arrow_flight::Ticket; +use arrow_flight::encode::FlightDataEncoderBuilder; +use arrow_flight::{FlightData, Ticket}; +use arrow_ipc::writer::IpcWriteOptions; use arrow_select::concat::concat_batches; use datafusion::logical_expr::BinaryExpr; use datafusion::prelude::Expr; use datafusion::scalar::ScalarValue; -use futures::TryStreamExt; +use futures::{stream, TryStreamExt}; -use tonic::{Request, Status}; +use tonic::{Request, Response, Status}; use arrow_flight::FlightClient; use http::Uri; use tonic::transport::Channel; -pub fn get_query_from_ticket(req: Request) -> Result { - serde_json::from_slice::(&req.into_inner().ticket) +pub type DoGetStream = stream::BoxStream<'static, Result>; + +pub fn get_query_from_ticket(req: &Request) -> Result { + serde_json::from_slice::(&req.get_ref().ticket) .map_err(|err| Status::internal(err.to_string())) } @@ -119,3 +141,20 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { fn lit_timestamp_milli(time: i64) -> Expr { Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None)) } + +pub fn into_flight_data(records: Vec) -> Result, Status> { + let input_stream = futures::stream::iter(records.into_iter().map(Ok)); + let write_options = IpcWriteOptions::default() + .try_with_compression(Some(arrow_ipc::CompressionType(1))) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let flight_data_stream = FlightDataEncoderBuilder::new() + .with_max_flight_data_size(usize::MAX) + .with_options(write_options) + // .with_schema(schema.into()) + .build(input_stream); + + let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + + Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream)) +}