Skip to content

Commit

Permalink
update arrow flight server to perform query cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 21, 2024
1 parent 419c5e2 commit e168cdd
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 73 deletions.
116 changes: 71 additions & 45 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
*/

use arrow_array::RecordBatch;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;

use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tonic::codec::CompressionEncoding;

Expand All @@ -34,34 +32,38 @@ use futures_util::{Future, TryFutureExt};
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::event::commit_schema;
use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::fetch_schema;

use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::option::CONFIG;

use crate::handlers::livetail::cross_origin_config;

use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
use crate::handlers::http::query::{
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::storage::object_storage::commit_schema_to_storage;
use crate::querycache::QueryCacheManager;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester,
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
};
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
SchemaResult, Ticket,
};
use arrow_ipc::writer::IpcWriteOptions;
use futures::{stream, TryStreamExt};
use futures::stream;
use tonic::{Request, Response, Status, Streaming};

use crate::handlers::livetail::extract_session_key;
use crate::metadata::STREAM_INFO;
use crate::rbac::Users;

use super::http::query::get_results_from_cache;

#[derive(Clone, Debug)]
pub struct AirServiceImpl {}

Expand Down Expand Up @@ -130,7 +132,7 @@ impl FlightService for AirServiceImpl {
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;

let ticket = get_query_from_ticket(req)?;
let ticket = get_query_from_ticket(&req)?;

log::info!("query requested to airplane: {:?}", ticket);

Expand All @@ -150,32 +152,57 @@ impl FlightService for AirServiceImpl {
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();

if CONFIG.parseable.mode == Mode::Query {
// using http to get the schema. may update to use flight later
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table, new_schema.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;
commit_schema(&table, Arc::new(new_schema))
.map_err(|err| Status::internal(err.to_string()))?;
}
}
let streams = visitor.into_inner();

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await
.unwrap_or(None);

let cache_results = req
.metadata()
.get(CACHE_RESULTS_HEADER_KEY)
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.

let show_cached = req
.metadata()
.get(CACHE_VIEW_HEADER_KEY)
.and_then(|value| value.to_str().ok());

let user_id = req
.metadata()
.get(USER_ID_HEADER_KEY)
.and_then(|value| value.to_str().ok());
let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();

// send the cached results
if let Ok(cache_results) = get_results_from_cache(
show_cached,
query_cache_manager,
&stream_name,
user_id,
&ticket.start_time,
&ticket.end_time,
&ticket.query,
ticket.send_null,
ticket.fields,
)
.await
{
return cache_results.into_flight();
}

update_schema_when_distributed(streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let mut query = into_query(&ticket, &session_state)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

// if table name is not present it is a Malformed Query
let stream_name = query
.first_table_name()
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;

let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
Expand Down Expand Up @@ -210,11 +237,23 @@ impl FlightService for AirServiceImpl {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
let (results, _) = query
let (records, _) = query
.execute(stream_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;

put_results_in_cache(
cache_results,
user_id,
query_cache_manager,
&stream_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
ticket.query,
)
.await;

/*
* INFO: No returning the schema with the data.
* kept it in case it needs to be sent in the future.
Expand All @@ -226,18 +265,7 @@ impl FlightService for AirServiceImpl {
.collect::<Vec<_>>();
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
*/
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
let write_options = IpcWriteOptions::default()
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
.map_err(|err| Status::failed_precondition(err.to_string()))?;

let flight_data_stream = FlightDataEncoderBuilder::new()
.with_max_flight_data_size(usize::MAX)
.with_options(write_options)
// .with_schema(schema.into())
.build(input_stream);

let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
let out = into_flight_data(records);

if let Some(event) = event {
event.clear(&stream_name);
Expand All @@ -248,9 +276,7 @@ impl FlightService for AirServiceImpl {
.with_label_values(&[&format!("flight-query-{}", stream_name)])
.observe(time);

Ok(Response::new(
Box::pin(flight_data_stream) as Self::DoGetStream
))
out
}

async fn do_put(
Expand Down
1 change: 0 additions & 1 deletion server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ impl QueryServer {
FILTERS.load().await?;
DASHBOARDS.load().await?;


// load data from stats back to prometheus metrics
metrics::fetch_stats_from_storage().await;
metrics::reset_daily_metric_from_global();
Expand Down
1 change: 0 additions & 1 deletion server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ impl Server {
FILTERS.load().await?;
DASHBOARDS.load().await?;


metrics::fetch_stats_from_storage().await;
metrics::reset_daily_metric_from_global();
storage::retention::load_retention_from_global();
Expand Down
42 changes: 23 additions & 19 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures_util::Future;
use http::{HeaderValue, StatusCode};
use http::StatusCode;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -85,9 +85,18 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await
.unwrap_or(None);

let cache_results = req.headers().get(CACHE_RESULTS_HEADER_KEY);
let show_cached = req.headers().get(CACHE_VIEW_HEADER_KEY);
let user_id = req.headers().get(USER_ID_HEADER_KEY);
let cache_results = req
.headers()
.get(CACHE_RESULTS_HEADER_KEY)
.and_then(|value| value.to_str().ok());
let show_cached = req
.headers()
.get(CACHE_VIEW_HEADER_KEY)
.and_then(|value| value.to_str().ok());
let user_id = req
.headers()
.get(USER_ID_HEADER_KEY)
.and_then(|value| value.to_str().ok());

// deal with cached data
if let Ok(results) = get_results_from_cache(
Expand Down Expand Up @@ -151,7 +160,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
Ok(response)
}

async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
Expand All @@ -167,9 +176,9 @@ async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), Query
}

#[allow(clippy::too_many_arguments)]
async fn put_results_in_cache(
cache_results: Option<&HeaderValue>,
user_id: Option<&HeaderValue>,
pub async fn put_results_in_cache(
cache_results: Option<&str>,
user_id: Option<&str>,
query_cache_manager: Option<&QueryCacheManager>,
stream: &str,
records: &[RecordBatch],
Expand All @@ -185,10 +194,7 @@ async fn put_results_in_cache(
}
// do cache
(Some(_), Some(query_cache_manager)) => {
let user_id = user_id
.expect("User Id was provided")
.to_str()
.expect("is proper ASCII");
let user_id = user_id.expect("User Id was provided");

if let Err(err) = query_cache_manager
.create_parquet_cache(stream, records, user_id, start, end, query)
Expand All @@ -209,11 +215,11 @@ async fn put_results_in_cache(
}

#[allow(clippy::too_many_arguments)]
async fn get_results_from_cache(
show_cached: Option<&HeaderValue>,
pub async fn get_results_from_cache(
show_cached: Option<&str>,
query_cache_manager: Option<&QueryCacheManager>,
stream: &str,
user_id: Option<&HeaderValue>,
user_id: Option<&str>,
start_time: &str,
end_time: &str,
query: &str,
Expand All @@ -228,10 +234,8 @@ async fn get_results_from_cache(
None
}
(Some(_), Some(query_cache_manager)) => {
let user_id = user_id
.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?
.to_str()
.map_err(|err| anyhow!(err))?;
let user_id =
user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?;

let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?;

Expand Down
13 changes: 12 additions & 1 deletion server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
*
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use crate::{
handlers::http::query::QueryError,
utils::arrow::{
flight::{into_flight_data, DoGetStream},
record_batches_to_json,
},
};
use actix_web::{web, Responder};
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tonic::{Response, Status};

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
Expand Down Expand Up @@ -57,4 +64,8 @@ impl QueryResponse {

Ok(web::Json(response))
}

pub fn into_flight(self) -> Result<Response<DoGetStream>, Status> {
into_flight_data(self.records)
}
}
2 changes: 1 addition & 1 deletion server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use super::{
};

use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY};
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY};
use crate::option::Mode;
use crate::{
alerts::Alerts,
Expand Down
Loading

0 comments on commit e168cdd

Please sign in to comment.