From 38b2e242272558071902cdf8f713a88507791e0e Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 25 Apr 2024 17:16:47 +0530 Subject: [PATCH] add staging query in airplane response(TBT) --- server/src/handlers/airplane.rs | 96 +++++++++++++++++++++++---- server/src/handlers/http/modal/mod.rs | 9 ++- server/src/handlers/http/query.rs | 8 +-- server/src/storage/staging.rs | 1 + 4 files changed, 96 insertions(+), 18 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index aefbeea7c..b5615ce99 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,16 +1,23 @@ +use arrow_array::RecordBatch; use arrow_flight::flight_service_server::FlightServiceServer; -use arrow_flight::PollInfo; +use arrow_flight::{FlightClient, PollInfo}; use arrow_schema::ArrowError; +use chrono::Utc; use datafusion::common::tree_node::TreeNode; +use futures::TryStreamExt; +use http::Uri; +use itertools::Itertools; +use serde_json::Value as JsonValue; use std::net::SocketAddr; use std::sync::Arc; use futures_util::{Future, TryFutureExt}; -use tonic::transport::{Identity, Server, ServerTlsConfig}; +use tonic::transport::{Channel, Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; use crate::event::commit_schema; +use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::fetch_schema; use crate::option::{Mode, CONFIG}; @@ -103,8 +110,27 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = serde_json::from_slice::(&req.into_inner().ticket) - .map_err(|err| Status::internal(err.to_string()))?; + + let ticket = if CONFIG.parseable.mode == Mode::Ingest { + let query = serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"] + .as_str() + .ok_or_else(|| Status::failed_precondition("query is not valid string"))? + .to_owned(); + QueryJson { + query, + send_null: false, + fields: false, + filter_tags: None, + // we can use humantime because into_query handle parseing + end_time: String::from("now"), + start_time: String::from("1min"), + } + } else { + serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string()))? + }; + log::info!("airplane requested for query {:?}", ticket); // get the query session_state @@ -144,6 +170,49 @@ impl FlightService for AirServiceImpl { let mut query = into_query(&ticket, &session_state) .await .map_err(|_| Status::internal("Failed to parse query"))?; + let time_delta = query.end - Utc::now(); + + let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 + { + let sql = ticket.query.clone(); + let ingester_metadatas = get_ingestor_info() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let mut minute_result: Vec = vec![]; + + for im in ingester_metadatas.iter() { + let mut url = im.domain_name.rsplit(":").collect_vec(); + let _ = url.pop(); + url.reverse(); + url.push(&im.flight_port); + let url = url.join(""); + let url = url + .parse::() + .map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?; + + let channel = Channel::builder(url) + .connect() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let mut client = FlightClient::new(channel); + client.add_header("authorization", &im.token)?; + + let response = client + .do_get(Ticket { + ticket: sql.clone().into(), + }) + .await?; + + let mut batches: Vec = response.try_collect().await?; + + minute_result.append(&mut batches); + } + + Some(minute_result) + } else { + None + }; // if table name is not present it is a Malformed Query let stream_name = query @@ -152,20 +221,23 @@ impl FlightService for AirServiceImpl { let permissions = Users.get_permissions(&key); - let table_name = query - .first_table_name() - .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; - authorize_and_set_filter_tags(&mut query, permissions, &table_name).map_err(|_| { + authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { Status::permission_denied("User Does not have permission to access this") })?; - let (results, _) = query - .execute(table_name.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; let schema = STREAM_INFO .schema(&stream_name) .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let (mut results, _) = query + .execute(stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + if let Some(mut minute_result) = minute_result { + results.append(&mut minute_result); + } + let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_flight_data = SchemaAsIpc::new(&schema, &options); diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index edd7bd3c3..f50d8c31a 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -62,6 +62,7 @@ pub struct IngestorMetadata { pub bucket_name: String, pub token: String, pub ingestor_id: String, + pub flight_port: String, } impl IngestorMetadata { @@ -73,6 +74,7 @@ impl IngestorMetadata { username: &str, password: &str, ingestor_id: String, + flight_port: String, ) -> Self { let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); @@ -85,6 +87,7 @@ impl IngestorMetadata { bucket_name, token, ingestor_id, + flight_port, } } @@ -110,9 +113,10 @@ mod test { "admin", "admin", "ingestor_id".to_string(), + "8002".to_string(), ); - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap(); + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap(); assert_eq!(rhs, lhs); } @@ -127,13 +131,14 @@ mod test { "admin", "admin", "ingestor_id".to_string(), + "8002".to_string(), ); let lhs = serde_json::to_string(&im) .unwrap() .try_into_bytes() .unwrap(); - let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"# + let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"# .try_into_bytes() .unwrap(); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index a7ef7a581..c3f456e60 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; @@ -51,14 +51,14 @@ use crate::utils::actix::extract_session_key_from_req; #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, - start_time: String, - end_time: String, + pub start_time: String, + pub end_time: String, #[serde(default)] pub send_null: bool, #[serde(skip)] pub fields: bool, #[serde(skip)] - filter_tags: Option>, + pub filter_tags: Option>, } pub async fn query(req: HttpRequest, query_request: Query) -> Result { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 512a9c4c0..80e85e9cd 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -366,6 +366,7 @@ pub fn get_ingestor_info() -> anyhow::Result { &CONFIG.parseable.username, &CONFIG.parseable.password, get_ingestor_id(), + CONFIG.parseable.flight_port, ); put_ingestor_info(out.clone())?;