Skip to content

Commit

Permalink
fix: response when user gives end time greater than current time
Browse files Browse the repository at this point in the history
if the user give end time that is greater than `Utc::now()`. Response
was not correct
  • Loading branch information
Eshanatnight committed May 15, 2024
1 parent bc7ab70 commit 5775e92
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 61 deletions.
73 changes: 39 additions & 34 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use arrow_array::RecordBatch;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::{ArrowError, Schema};
use arrow_schema::ArrowError;

use chrono::Utc;
use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -28,7 +28,9 @@ use crate::handlers::livetail::cross_origin_config;
use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::storage::object_storage::commit_schema_to_storage;
use crate::utils::arrow::flight::{append_temporary_events, get_query_from_ticket, run_do_get_rpc};
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester,
};
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand All @@ -42,9 +44,6 @@ use crate::handlers::livetail::extract_session_key;
use crate::metadata::STREAM_INFO;
use crate::rbac::Users;

const L_CURLY: char = '{';
const R_CURLY: char = '}';

#[derive(Clone, Debug)]
pub struct AirServiceImpl {}

Expand Down Expand Up @@ -159,50 +158,56 @@ impl FlightService for AirServiceImpl {
.first_table_name()
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;

let time_delta = query.end - Utc::now();

let event = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 {
let sql = format!(
"{}\"query\": \"select * from {}\"{}",
L_CURLY, &stream_name, R_CURLY
);
let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, sql.clone()).await {
minute_result.append(&mut batches);
let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
}
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;

Some(event)
} else {
None
};
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};
let permissions = Users.get_permissions(&key);

authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;

let time = Instant::now();
let (results, _) = query
.execute(stream_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))
.unwrap();
.map_err(|err| Status::internal(err.to_string()))?;

/*
* INFO: No returning the schema with the data.
* kept it in case it needs to be sent in the future.
let schemas = results
.iter()
.map(|batch| batch.schema())
.map(|s| s.as_ref().clone())
.collect::<Vec<_>>();
let _schema =
Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
*/
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
let write_options = IpcWriteOptions::default()
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
Expand Down
2 changes: 1 addition & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

mod filter_optimizer;
mod listing_table_builder;
mod stream_schema_provider;
pub mod stream_schema_provider;

use chrono::{DateTime, Utc};
use chrono::{NaiveDateTime, TimeZone};
Expand Down
2 changes: 1 addition & 1 deletion server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ fn is_overlapping_query(
false
}

fn include_now(filters: &[Expr], time_partition: Option<String>) -> bool {
pub fn include_now(filters: &[Expr], time_partition: Option<String>) -> bool {
let current_minute = Utc::now()
.with_second(0)
.and_then(|x| x.with_nanosecond(0))
Expand Down
70 changes: 45 additions & 25 deletions server/src/utils/arrow/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,35 @@ use crate::event::Event;
use crate::handlers::http::ingest::push_logs_unchecked;
use crate::handlers::http::query::Query as QueryJson;
use crate::metadata::STREAM_INFO;
use crate::query::stream_schema_provider::include_now;
use crate::{
handlers::http::modal::IngestorMetadata,
option::{Mode, CONFIG},
};

use arrow_array::RecordBatch;
use arrow_flight::Ticket;
use arrow_select::concat::concat_batches;
use datafusion::logical_expr::BinaryExpr;
use datafusion::prelude::Expr;
use datafusion::scalar::ScalarValue;
use futures::TryStreamExt;
use serde_json::Value as JsonValue;

use tonic::{Request, Status};

use arrow_flight::FlightClient;
use http::Uri;
use tonic::transport::Channel;

pub fn get_query_from_ticket(req: Request<Ticket>) -> Result<QueryJson, Status> {
if CONFIG.parseable.mode == Mode::Ingest {
let inner = req.into_inner().ticket;
let query = serde_json::from_slice::<JsonValue>(&inner)
.map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"]
.as_str()
.ok_or_else(|| Status::failed_precondition("query is not valid string"))?
.to_owned();
Ok(QueryJson {
query,
send_null: false,
fields: false,
filter_tags: None,
// we can use humantime because into_query handle parseing
end_time: String::from("now"),
start_time: String::from("1min"),
})
} else {
Ok(
serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))?,
)
}
serde_json::from_slice::<QueryJson>(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))
}

pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result<Vec<RecordBatch>, Status> {
pub async fn run_do_get_rpc(
im: IngestorMetadata,
ticket: String,
) -> Result<Vec<RecordBatch>, Status> {
let url = im
.domain_name
.rsplit_once(':')
Expand Down Expand Up @@ -72,7 +60,7 @@ pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result<Vec<Rec

let response = client
.do_get(Ticket {
ticket: sql.clone().into(),
ticket: ticket.into(),
})
.await?;

Expand All @@ -99,3 +87,35 @@ pub async fn append_temporary_events(
.map_err(|err| Status::internal(err.to_string()))?;
Ok(event)
}

pub fn send_to_ingester(start: i64, end: i64) -> bool {
let filter_start = lit_timestamp_milli(
start, //query.start.timestamp_millis()
);
let filter_end = lit_timestamp_milli(
end, //query.end.timestamp_millis()
);

let expr_left = Expr::Column(datafusion::common::Column {
relation: None,
name: "p_timestamp".to_owned(),
});

let ex1 = BinaryExpr::new(
Box::new(expr_left.clone()),
datafusion::logical_expr::Operator::Gt,
Box::new(filter_start),
);
let ex2 = BinaryExpr::new(
Box::new(expr_left),
datafusion::logical_expr::Operator::Lt,
Box::new(filter_end),
);
let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)];

CONFIG.parseable.mode == Mode::Query && include_now(&ex, None)
}

fn lit_timestamp_milli(time: i64) -> Expr {
Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None))
}

0 comments on commit 5775e92

Please sign in to comment.