Skip to content

Commit

Permalink
update querying with cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 13, 2024
1 parent 31a6c21 commit 7798ddb
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 39 deletions.
2 changes: 2 additions & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod livetail;
const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results";
const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
Expand Down
108 changes: 71 additions & 37 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
Expand Down Expand Up @@ -74,36 +75,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let stream = visitor.top();
let stream = visitor
.top()
.ok_or_else(|| QueryError::MalformedQuery("Table Name not found in SQL"))?;

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

if let Some(query_cache_manager) = query_cache_manager {
let mut query_cache = query_cache_manager.get_cache(stream).await?;

let (start, end) = parse_human_time(&query_request.start_time, &query_request.end_time)?;
let key = format!(
"{}-{}-{}",
start.to_rfc3339(),
end.to_rfc3339(),
query_request.query.clone()
);

let file_path = query_cache.get_file(key);
if let Some(file_path) = file_path {
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;
let cache_results = req
.headers()
.iter()
.find(|&(key, _)| key == CACHE_RESULTS_HEADER_KEY);

let show_cached = req
.headers()
.iter()
.find(|&(key, _)| key == CACHE_VIEW_HEADER_KEY);

match (show_cached, query_cache_manager) {
(None, None) => {}
(None, Some(_)) => {}
(Some(_), None) => {
log::warn!(
"Instructed to show cached results but Query Caching is not Enabledon Server"
);
}
(Some(_), Some(query_cache_manager)) => {
let mut query_cache = query_cache_manager.get_cache(stream).await?;

let (start, end) =
parse_human_time(&query_request.start_time, &query_request.end_time)?;
let key = format!(
"{}-{}-{}",
start.to_rfc3339(),
end.to_rfc3339(),
query_request.query.clone()
);

let file_path = query_cache.get_file(key);
if let Some(file_path) = file_path {
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;

return Ok(response);
return Ok(response);
}
}
};
}

let tables = visitor.into_inner();

Expand All @@ -125,23 +149,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

let table_name = query
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?;
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();

let (records, fields) = query.execute(table_name.clone()).await?;
// put the rbs to parquet
if let Some(query_cache_manager) = query_cache_manager {
query_cache_manager
.create_parquet_cache(
&table_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
)
.await?;

match (cache_results, query_cache_manager) {
(None, None) => {}
(None, Some(_)) => {}
(Some(_), None) => {
log::warn!(
"Instructed to cache query results but Query Caching is not Enabled in Server"
);
}
// do cache
(Some(_), Some(query_cache_manager)) => {
query_cache_manager
.create_parquet_cache(
&table_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
)
.await?
}
}

let response = QueryResponse {
Expand Down Expand Up @@ -348,7 +382,7 @@ pub enum QueryError {
#[error("Evern Error: {0}")]
EventError(#[from] EventError),
#[error("Error: {0}")]
MalformedQuery(String),
MalformedQuery(&'static str),
#[error(
r#"Error: Failed to Parse Record Batch into Json
Description: {0}"#
Expand Down
4 changes: 2 additions & 2 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ impl TableScanVisitor {
pub fn into_inner(self) -> Vec<String> {
self.tables
}
pub fn top(&self) -> &str {
self.tables[0].as_ref()
pub fn top(&self) -> Option<&str> {
self.tables.first().map(|s| s.as_ref())
}
}

Expand Down

0 comments on commit 7798ddb

Please sign in to comment.