Skip to content

Commit

Permalink
Merge branch 'impl-query-result-caching' into dashboard-api-addition
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 13, 2024
2 parents b1819f5 + 1fae167 commit 31a6c21
Show file tree
Hide file tree
Showing 10 changed files with 459 additions and 23 deletions.
9 changes: 5 additions & 4 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ build = "build.rs"

[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "51.0.0", features = ["serde"] }
arrow-array = { version = "51.0.0" }
arrow-json = "51.0.0"
arrow-ipc = "51.0.0"
arrow-select = "51.0.0"
datafusion = "37.1.0"
object_store = { version = "0.9.1", features = ["cloud", "aws"] }
object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "51.0.0"

### LiveTail server deps
Expand Down Expand Up @@ -74,11 +75,11 @@ relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default_features = false, features = [
"rustls-tls",
"json",
] }
rustls = "0.22.4"
] } # cannot update cause rustls is not latest `see rustls`
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
rustls-pemfile = "2.1.2"
semver = "1.0"
serde = { version = "1.0", features = ["rc"] }
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.30.11"
Expand Down
32 changes: 32 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ pub struct Cli {

/// public address for the parseable server ingestor
pub ingestor_endpoint: String,

/// to query cached data
pub query_cache_path: Option<PathBuf>,

/// Size for local cache
pub query_cache_size: u64,
}

impl Cli {
Expand All @@ -99,6 +105,8 @@ impl Cli {
pub const DOMAIN_URI: &'static str = "origin";
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const QUERY_CACHE: &'static str = "query-cache-path";
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
Expand Down Expand Up @@ -187,6 +195,25 @@ impl Cli {
.next_line_help(true),
)

.arg(
Arg::new(Self::QUERY_CACHE)
.long(Self::QUERY_CACHE)
.env("P_QUERY_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::QUERY_CACHE_SIZE)
.long(Self::QUERY_CACHE_SIZE)
.env("P_QUERY_CACHE_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::USERNAME)
.long(Self::USERNAME)
Expand Down Expand Up @@ -358,6 +385,7 @@ impl FromArgMatches for Cli {

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
Expand All @@ -380,6 +408,10 @@ impl FromArgMatches for Cli {
.get_one::<u64>(Self::CACHE_SIZE)
.cloned()
.expect("default value for cache size");
self.query_cache_size = m
.get_one(Self::QUERY_CACHE_SIZE)
.cloned()
.expect("default value for query cache size");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
Expand Down
86 changes: 71 additions & 15 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::localcache::CacheError;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::QueryCacheManager;
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -50,7 +52,7 @@ use crate::utils::actix::extract_session_key_from_req;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Query {
query: String,
pub query: String,
start_time: String,
end_time: String,
#[serde(default)]
Expand All @@ -72,6 +74,36 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let stream = visitor.top();
let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

if let Some(query_cache_manager) = query_cache_manager {
let mut query_cache = query_cache_manager.get_cache(stream).await?;

let (start, end) = parse_human_time(&query_request.start_time, &query_request.end_time)?;
let key = format!(
"{}-{}-{}",
start.to_rfc3339(),
end.to_rfc3339(),
query_request.query.clone()
);

let file_path = query_cache.get_file(key);
if let Some(file_path) = file_path {
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;

return Ok(response);
}
};

let tables = visitor.into_inner();

Expand Down Expand Up @@ -99,6 +131,19 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
let time = Instant::now();

let (records, fields) = query.execute(table_name.clone()).await?;
// put the rbs to parquet
if let Some(query_cache_manager) = query_cache_manager {
query_cache_manager
.create_parquet_cache(
&table_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
)
.await?;
}

let response = QueryResponse {
records,
fields,
Expand Down Expand Up @@ -199,20 +244,7 @@ async fn into_query(
return Err(QueryError::EmptyEndTime);
}

let start: DateTime<Utc>;
let end: DateTime<Utc>;

if query.end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(&query.start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(&query.start_time)
.map_err(|_| QueryError::StartTimeParse)?
.into();
end = DateTime::parse_from_rfc3339(&query.end_time)
.map_err(|_| QueryError::EndTimeParse)?
.into();
};
let (start, end) = parse_human_time(&query.start_time, &query.end_time)?;

if start.timestamp() > end.timestamp() {
return Err(QueryError::StartTimeAfterEndTime);
Expand All @@ -226,6 +258,28 @@ async fn into_query(
})
}

fn parse_human_time(
start_time: &str,
end_time: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)
.map_err(|_| QueryError::StartTimeParse)?
.into();
end = DateTime::parse_from_rfc3339(end_time)
.map_err(|_| QueryError::EndTimeParse)?
.into();
};

Ok((start, end))
}

/// unused for now, might need it in the future
#[allow(unused)]
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
Expand Down Expand Up @@ -289,6 +343,8 @@ pub enum QueryError {
Execute(#[from] ExecuteError),
#[error("ObjectStorage Error: {0}")]
ObjectStorage(#[from] ObjectStorageError),
#[error("Cache Error: {0}")]
CacheError(#[from] CacheError),
#[error("Evern Error: {0}")]
EventError(#[from] EventError),
#[error("Error: {0}")]
Expand Down
7 changes: 6 additions & 1 deletion server/src/localcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ use human_size::{Byte, Gigibyte, SpecificSize};
use itertools::{Either, Itertools};
use object_store::{local::LocalFileSystem, ObjectStore};
use once_cell::sync::OnceCell;
use parquet::errors::ParquetError;
use tokio::{fs, sync::Mutex};

use crate::option::CONFIG;
use crate::{metadata::error::stream_info::MetadataError, option::CONFIG};

pub const STREAM_CACHE_FILENAME: &str = ".cache.json";
pub const CACHE_META_FILENAME: &str = ".cache_meta.json";
Expand Down Expand Up @@ -256,4 +257,8 @@ pub enum CacheError {
MoveError(#[from] fs_extra::error::Error),
#[error("{0}")]
ObjectStoreError(#[from] object_store::Error),
#[error("{0}")]
ParquetError(#[from] ParquetError),
#[error("{0}")]
MetadataError(#[from] MetadataError),
}
1 change: 1 addition & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod migration;
mod oidc;
mod option;
mod query;
mod querycache;
mod rbac;
mod response;
mod static_schema;
Expand Down
2 changes: 1 addition & 1 deletion server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
}

#[inline(always)]
fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
pub fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes {
serde_json::to_vec(any)
.map(|any| any.into())
.expect("serialize cannot fail")
Expand Down
3 changes: 3 additions & 0 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ impl TableScanVisitor {
pub fn into_inner(self) -> Vec<String> {
self.tables
}
pub fn top(&self) -> &str {
self.tables[0].as_ref()
}
}

impl TreeNodeVisitor for TableScanVisitor {
Expand Down
Loading

0 comments on commit 31a6c21

Please sign in to comment.