Skip to content

Commit

Permalink
impl query caching
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 14, 2024
1 parent b832893 commit f45850c
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 132 deletions.
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results";
const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached";
const USER_ID_HEADER_KEY: &str = "x-p-user-id";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
Expand Down
227 changes: 146 additions & 81 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,24 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::StatusCode;
use http::{HeaderValue, StatusCode};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use arrow_array::RecordBatch;

use crate::event::commit_schema;
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY};
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
Expand Down Expand Up @@ -83,116 +85,172 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await
.unwrap_or(None);

let cache_results = req
let cache_results = req.headers().get(CACHE_RESULTS_HEADER_KEY);
let show_cached = req.headers().get(CACHE_VIEW_HEADER_KEY);
let user_id = req
.headers()
.iter()
.find(|&(key, _)| key == CACHE_RESULTS_HEADER_KEY);
.get(USER_ID_HEADER_KEY)
.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?
.to_str()
.map_err(|err| anyhow!(err))?;

// deal with cached data
if let Ok(results) = get_results_from_cache(
show_cached,
query_cache_manager,
stream,
user_id,
&query_request.start_time,
&query_request.end_time,
&query_request.query,
query_request.send_null,
query_request.fields,
)
.await
{
return results.to_http();
};

let show_cached = req
.headers()
.iter()
.find(|&(key, _)| key == CACHE_VIEW_HEADER_KEY);
let tables = visitor.into_inner();
update_schema_when_distributed(tables).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;

match (show_cached, query_cache_manager) {
(None, None) => {}
(None, Some(_)) => {}
(Some(_), None) => {
log::warn!(
"Instructed to show cached results but Query Caching is not Enabledon Server"
);
}
(Some(_), Some(query_cache_manager)) => {
let mut query_cache = query_cache_manager.get_cache(stream).await?;

let (start, end) =
parse_human_time(&query_request.start_time, &query_request.end_time)?;
let key = format!(
"{}-{}-{}",
start.to_rfc3339(),
end.to_rfc3339(),
query_request.query.clone()
);
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

let file_path = query_cache.get_file(key);
if let Some(file_path) = file_path {
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;
let table_name = query
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

return Ok(response);
}
}
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();
let (records, fields) = query.execute(table_name.clone()).await?;
// deal with cache saving
put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&table_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
)
.await;

let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;

let tables = visitor.into_inner();
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

Ok(response)
}

async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table, Arc::new(new_schema)).map_err(QueryError::EventError)?;
commit_schema_to_storage(&table, new_schema.clone()).await?;

commit_schema(&table, Arc::new(new_schema))?;
}
}
}
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions: Vec<Permission> = Users.get_permissions(&creds);

let table_name = query
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();

let (records, fields) = query.execute(table_name.clone()).await?;
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn put_results_in_cache(
cache_results: Option<&HeaderValue>,
user_id: &str,
query_cache_manager: Option<&QueryCacheManager>,
stream: &str,
records: &[RecordBatch],
start: String,
end: String,
query: String,
) {
match (cache_results, query_cache_manager) {
(None, None) => {}
(None, Some(_)) => {}
(Some(_), None) => {
log::warn!(
"Instructed to cache query results but Query Caching is not Enabled in Server"
);
}
// do cache
(Some(_), Some(query_cache_manager)) => {
query_cache_manager
.create_parquet_cache(
&table_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
)
.await?
if let Err(err) = query_cache_manager
.create_parquet_cache(stream, records, user_id, start, end, query)
.await
{
log::error!("Error occured while caching query results: {:?}", err);
if query_cache_manager
.clear_cache(stream, user_id)
.await
.is_err()
{
log::error!("Error Clearing Unwanted files from cache dir");
}
}
}
(None, _) => {}
}
}

let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;
#[allow(clippy::too_many_arguments)]
async fn get_results_from_cache(
show_cached: Option<&HeaderValue>,
query_cache_manager: Option<&QueryCacheManager>,
stream: &str,
user_id: &str,
start_time: &str,
end_time: &str,
query: &str,
send_null: bool,
send_fields: bool,
) -> Result<QueryResponse, QueryError> {
match (show_cached, query_cache_manager) {
(Some(_), None) => {
log::warn!(
"Instructed to show cached results but Query Caching is not Enabled on Server"
);
None
}
(Some(_), Some(query_cache_manager)) => {
let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?;

let time = time.elapsed().as_secs_f64();
let (start, end) = parse_human_time(start_time, end_time)?;
let key = format!("{}-{}-{}", start.to_rfc3339(), end.to_rfc3339(), query);

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
let file_path = query_cache.get_file(key);
if let Some(file_path) = file_path {
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
let response = QueryResponse {
records,
fields,
fill_null: send_null,
with_fields: send_fields,
};

Ok(response)
Some(Ok(response))
} else {
None
}
}
(_, _) => None,
}
.map_or_else(|| Err(QueryError::CacheMiss), |ret_val| ret_val)
}

fn authorize_and_set_filter_tags(
Expand Down Expand Up @@ -379,15 +437,22 @@ pub enum QueryError {
ObjectStorage(#[from] ObjectStorageError),
#[error("Cache Error: {0}")]
CacheError(#[from] CacheError),
#[error("")]
CacheMiss,
#[error("Evern Error: {0}")]
EventError(#[from] EventError),
#[error("Error: {0}")]
MalformedQuery(&'static str),
#[allow(unused)]
#[error(
r#"Error: Failed to Parse Record Batch into Json
Description: {0}"#
)]
JsonParse(String),
#[error("Error: {0}")]
ActixError(#[from] actix_web::Error),
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
}

impl actix_web::ResponseError for QueryError {
Expand Down
Loading

0 comments on commit f45850c

Please sign in to comment.