Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Users API for Dashboads, caching and filters #798

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
afaf87f
add the dashboards api
Eshanatnight May 13, 2024
4b22348
add: filters api
Eshanatnight May 13, 2024
f121f74
add cli args for query caching
Eshanatnight May 7, 2024
721abf5
misc changes
Eshanatnight May 7, 2024
e654177
update errors
Eshanatnight May 7, 2024
e081a1b
refactor: improve time parsing logic in query handler
Eshanatnight May 7, 2024
455dc81
impl query result caching
Eshanatnight May 7, 2024
35118c8
update querying with cache
Eshanatnight May 13, 2024
89428f1
chore: clean up
Eshanatnight May 14, 2024
810ad54
impl query caching
Eshanatnight May 14, 2024
284882c
add ability to store filters and dashboards in memory
Eshanatnight May 15, 2024
16f5a39
fix: bug if user_id is not provided
Eshanatnight May 15, 2024
6348896
misc add license headings
Eshanatnight May 17, 2024
9d35e81
cleanup rough edges
Eshanatnight May 17, 2024
83a358c
update arrow flight server to perform query cache
Eshanatnight May 21, 2024
e11f1db
fix: users root dir excluded when listing streams
Eshanatnight May 21, 2024
9e9b4c7
change the key for cache
Eshanatnight May 21, 2024
2d59640
add cache endpoints
Eshanatnight May 21, 2024
fd6422d
update .gitignore to exclude directory called cache
Eshanatnight May 22, 2024
77a7241
fix bug if user id is not provided and query caching is enabled
Eshanatnight May 23, 2024
341ee6e
add value check for the show_cached and cache_results headers
Eshanatnight May 23, 2024
a88356c
clean up for better readablility
Eshanatnight May 23, 2024
54db5ef
fix: the issue with caching data
Eshanatnight May 24, 2024
17d07d6
fix: issue creating multiple cache files
Eshanatnight May 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ env-file
parseable
parseable_*
parseable-env-secret
cache*
cache

9 changes: 5 additions & 4 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ build = "build.rs"

[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "51.0.0", features = ["serde"] }
arrow-array = { version = "51.0.0" }
arrow-json = "51.0.0"
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
arrow-select = "51.0.0"
datafusion = "37.1.0"
object_store = { version = "0.9.1", features = ["cloud", "aws"] }
object_store = { version = "0.9.1", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "51.0.0"
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
Expand Down Expand Up @@ -72,11 +73,11 @@ relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default_features = false, features = [
"rustls-tls",
"json",
] }
rustls = "0.22.4"
] } # cannot update cause rustls is not latest `see rustls`
rustls = "0.22.4" # cannot update to 0.23 actix has not caught up yet
rustls-pemfile = "2.1.2"
semver = "1.0"
serde = { version = "1.0", features = ["rc"] }
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.30.11"
Expand Down
3 changes: 1 addition & 2 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,7 @@ async fn create_manifest(
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))
.map_err(ObjectStorageError::IoError)?,
))?,
)
.and_utc();

Expand Down
32 changes: 32 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ pub struct Cli {

/// port use by airplane(flight query service)
pub flight_port: u16,

/// to query cached data
pub query_cache_path: Option<PathBuf>,

/// Size for local cache
pub query_cache_size: u64,
}

impl Cli {
Expand All @@ -102,6 +108,8 @@ impl Cli {
pub const DOMAIN_URI: &'static str = "origin";
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const QUERY_CACHE: &'static str = "query-cache-path";
pub const QUERY_CACHE_SIZE: &'static str = "query-cache-size";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
Expand Down Expand Up @@ -191,6 +199,25 @@ impl Cli {
.next_line_help(true),
)

.arg(
Arg::new(Self::QUERY_CACHE)
.long(Self::QUERY_CACHE)
.env("P_QUERY_CACHE_DIR")
.value_name("DIR")
.value_parser(validation::canonicalize_path)
.help("Local path on this device to be used for caching data")
.next_line_help(true),
)
.arg(
Arg::new(Self::QUERY_CACHE_SIZE)
.long(Self::QUERY_CACHE_SIZE)
.env("P_QUERY_CACHE_SIZE")
.value_name("size")
.default_value("1GiB")
.value_parser(validation::cache_size)
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::USERNAME)
.long(Self::USERNAME)
Expand Down Expand Up @@ -372,6 +399,7 @@ impl FromArgMatches for Cli {

fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> {
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.domain_address = m.get_one::<Url>(Self::DOMAIN_URI).cloned();
Expand All @@ -394,6 +422,10 @@ impl FromArgMatches for Cli {
.get_one::<u64>(Self::CACHE_SIZE)
.cloned()
.expect("default value for cache size");
self.query_cache_size = m
.get_one(Self::QUERY_CACHE_SIZE)
.cloned()
.expect("default value for query cache size");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
Expand Down
9 changes: 2 additions & 7 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ impl FileWriter {
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
writer
.writer
.write(record)
.map_err(StreamWriterError::Writer)?;
writer.writer.write(record)?;
}
// entry is not present thus we create it
None => {
Expand Down Expand Up @@ -100,8 +97,6 @@ fn init_new_stream_writer_file(
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
.expect("File and RecordBatch both are checked");

stream_writer
.write(record)
.map_err(StreamWriterError::Writer)?;
stream_writer.write(record)?;
Ok((path, stream_writer))
}
3 changes: 3 additions & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod livetail;
const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results";
const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached";
const USER_ID_HEADER_KEY: &str = "x-p-user-id";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
Expand Down
137 changes: 92 additions & 45 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use arrow_array::RecordBatch;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;

use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tonic::codec::CompressionEncoding;

Expand All @@ -16,34 +32,38 @@ use futures_util::{Future, TryFutureExt};
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::event::commit_schema;
use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::fetch_schema;

use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::option::CONFIG;

use crate::handlers::livetail::cross_origin_config;

use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
use crate::handlers::http::query::{
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::storage::object_storage::commit_schema_to_storage;
use crate::querycache::QueryCacheManager;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester,
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
};
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
SchemaResult, Ticket,
};
use arrow_ipc::writer::IpcWriteOptions;
use futures::{stream, TryStreamExt};
use futures::stream;
use tonic::{Request, Response, Status, Streaming};

use crate::handlers::livetail::extract_session_key;
use crate::metadata::STREAM_INFO;
use crate::rbac::Users;

use super::http::query::get_results_from_cache;

#[derive(Clone, Debug)]
pub struct AirServiceImpl {}

Expand Down Expand Up @@ -112,7 +132,7 @@ impl FlightService for AirServiceImpl {
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;

let ticket = get_query_from_ticket(req)?;
let ticket = get_query_from_ticket(&req)?;

log::info!("query requested to airplane: {:?}", ticket);

Expand All @@ -132,32 +152,57 @@ impl FlightService for AirServiceImpl {
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();

if CONFIG.parseable.mode == Mode::Query {
// using http to get the schema. may update to use flight later
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table, new_schema.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;
commit_schema(&table, Arc::new(new_schema))
.map_err(|err| Status::internal(err.to_string()))?;
}
}
let streams = visitor.into_inner();

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

let cache_results = req
.metadata()
.get(CACHE_RESULTS_HEADER_KEY)
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.

let show_cached = req
.metadata()
.get(CACHE_VIEW_HEADER_KEY)
.and_then(|value| value.to_str().ok());

let user_id = req
.metadata()
.get(USER_ID_HEADER_KEY)
.and_then(|value| value.to_str().ok());
let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

// send the cached results
if let Ok(cache_results) = get_results_from_cache(
show_cached,
query_cache_manager,
&stream_name,
user_id,
&ticket.start_time,
&ticket.end_time,
&ticket.query,
ticket.send_null,
ticket.fields,
)
.await
{
return cache_results.into_flight();
}

update_schema_when_distributed(streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let mut query = into_query(&ticket, &session_state)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

// if table name is not present it is a Malformed Query
let stream_name = query
.first_table_name()
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;

let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
Expand Down Expand Up @@ -192,11 +237,26 @@ impl FlightService for AirServiceImpl {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
let (results, _) = query
let (records, _) = query
.execute(stream_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;

if let Err(err) = put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&stream_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
ticket.query,
)
.await
{
log::error!("{}", err);
};

/*
* INFO: No returning the schema with the data.
* kept it in case it needs to be sent in the future.
Expand All @@ -208,18 +268,7 @@ impl FlightService for AirServiceImpl {
.collect::<Vec<_>>();
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
*/
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
let write_options = IpcWriteOptions::default()
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
.map_err(|err| Status::failed_precondition(err.to_string()))?;

let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(usize::MAX)
.with_options(write_options)
// .with_schema(schema.into())
.build(input_stream);

let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
let out = into_flight_data(records);

if let Some(event) = event {
event.clear(&stream_name);
Expand All @@ -230,9 +279,7 @@ impl FlightService for AirServiceImpl {
.with_label_values(&[&format!("flight-query-{}", stream_name)])
.observe(time);

Ok(Response::new(
Box::pin(flight_data_stream) as Self::DoGetStream
))
out
}

async fn do_put(
Expand Down
3 changes: 2 additions & 1 deletion server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::option::CONFIG;
use self::{cluster::get_ingestor_info, query::Query};

pub(crate) mod about;
mod cache;
pub mod cluster;
pub(crate) mod health_check;
pub(crate) mod ingest;
Expand All @@ -39,7 +40,7 @@ mod otel;
pub(crate) mod query;
pub(crate) mod rbac;
pub(crate) mod role;

pub mod users;
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
pub const API_BASE_PATH: &str = "api";
pub const API_VERSION: &str = "v1";
Expand Down
Loading
Loading