Skip to content

Commit

Permalink
add staging query in airplane response(TBT)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Apr 25, 2024
1 parent b2d3301 commit 38b2e24
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 18 deletions.
96 changes: 84 additions & 12 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use arrow_array::RecordBatch;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_flight::{FlightClient, PollInfo};
use arrow_schema::ArrowError;
use chrono::Utc;
use datafusion::common::tree_node::TreeNode;
use futures::TryStreamExt;
use http::Uri;
use itertools::Itertools;
use serde_json::Value as JsonValue;
use std::net::SocketAddr;
use std::sync::Arc;

use futures_util::{Future, TryFutureExt};

use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic::transport::{Channel, Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::event::commit_schema;
use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::fetch_schema;
use crate::option::{Mode, CONFIG};

Expand Down Expand Up @@ -103,8 +110,27 @@ impl FlightService for AirServiceImpl {

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let ticket = serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))?;

let ticket = if CONFIG.parseable.mode == Mode::Ingest {
let query = serde_json::from_slice::<JsonValue>(&req.into_inner().ticket)
.map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"]
.as_str()
.ok_or_else(|| Status::failed_precondition("query is not valid string"))?
.to_owned();
QueryJson {
query,
send_null: false,
fields: false,
filter_tags: None,
// we can use humantime because into_query handle parseing
end_time: String::from("now"),
start_time: String::from("1min"),
}
} else {
serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))?
};

log::info!("airplane requested for query {:?}", ticket);

// get the query session_state
Expand Down Expand Up @@ -144,6 +170,49 @@ impl FlightService for AirServiceImpl {
let mut query = into_query(&ticket, &session_state)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;
let time_delta = query.end - Utc::now();

let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2
{
let sql = ticket.query.clone();
let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas.iter() {
let mut url = im.domain_name.rsplit(":").collect_vec();
let _ = url.pop();
url.reverse();
url.push(&im.flight_port);
let url = url.join("");
let url = url
.parse::<Uri>()
.map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?;

let channel = Channel::builder(url)
.connect()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;

let mut client = FlightClient::new(channel);
client.add_header("authorization", &im.token)?;

let response = client
.do_get(Ticket {
ticket: sql.clone().into(),
})
.await?;

let mut batches: Vec<RecordBatch> = response.try_collect().await?;

minute_result.append(&mut batches);
}

Some(minute_result)
} else {
None
};

// if table name is not present it is a Malformed Query
let stream_name = query
Expand All @@ -152,20 +221,23 @@ impl FlightService for AirServiceImpl {

let permissions = Users.get_permissions(&key);

let table_name = query
.first_table_name()
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;
authorize_and_set_filter_tags(&mut query, permissions, &table_name).map_err(|_| {
authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;

let (results, _) = query
.execute(table_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;
let schema = STREAM_INFO
.schema(&stream_name)
.map_err(|err| Status::failed_precondition(err.to_string()))?;

let (mut results, _) = query
.execute(stream_name)
.await
.map_err(|err| Status::internal(err.to_string()))?;

if let Some(mut minute_result) = minute_result {
results.append(&mut minute_result);
}

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data = SchemaAsIpc::new(&schema, &options);

Expand Down
9 changes: 7 additions & 2 deletions server/src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct IngestorMetadata {
pub bucket_name: String,
pub token: String,
pub ingestor_id: String,
pub flight_port: String,
}

impl IngestorMetadata {
Expand All @@ -73,6 +74,7 @@ impl IngestorMetadata {
username: &str,
password: &str,
ingestor_id: String,
flight_port: String,
) -> Self {
let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password));

Expand All @@ -85,6 +87,7 @@ impl IngestorMetadata {
bucket_name,
token,
ingestor_id,
flight_port,
}
}

Expand All @@ -110,9 +113,10 @@ mod test {
"admin",
"admin",
"ingestor_id".to_string(),
"8002".to_string(),
);

let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap();
let rhs = serde_json::from_slice::<IngestorMetadata>(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap();

assert_eq!(rhs, lhs);
}
Expand All @@ -127,13 +131,14 @@ mod test {
"admin",
"admin",
"ingestor_id".to_string(),
"8002".to_string(),
);

let lhs = serde_json::to_string(&im)
.unwrap()
.try_into_bytes()
.unwrap();
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"#
let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"#
.try_into_bytes()
.unwrap();

Expand Down
8 changes: 4 additions & 4 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};

Check failure on line 22 in server/src/handlers/http/query.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

unused import: `Duration`

Check warning on line 22 in server/src/handlers/http/query.rs

View workflow job for this annotation

GitHub Actions / Unit tests

unused import: `Duration`
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
Expand Down Expand Up @@ -51,14 +51,14 @@ use crate::utils::actix::extract_session_key_from_req;
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
start_time: String,
end_time: String,
pub start_time: String,
pub end_time: String,
#[serde(default)]
pub send_null: bool,
#[serde(skip)]
pub fields: bool,
#[serde(skip)]
filter_tags: Option<Vec<String>>,
pub filter_tags: Option<Vec<String>>,
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
Expand Down
1 change: 1 addition & 0 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ pub fn get_ingestor_info() -> anyhow::Result<IngestorMetadata> {
&CONFIG.parseable.username,
&CONFIG.parseable.password,
get_ingestor_id(),
CONFIG.parseable.flight_port,

Check failure on line 369 in server/src/storage/staging.rs

View workflow job for this annotation

GitHub Actions / Cargo Clippy check

mismatched types

Check failure on line 369 in server/src/storage/staging.rs

View workflow job for this annotation

GitHub Actions / Unit tests

mismatched types
);

put_ingestor_info(out.clone())?;
Expand Down

0 comments on commit 38b2e24

Please sign in to comment.