From 1581309669b9a36008eab27e996bf02d33cc7ed6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 19 Apr 2024 12:32:46 +0530 Subject: [PATCH 01/18] make some fields pub --- server/src/handlers/http/query.rs | 6 +++--- server/src/handlers/livetail.rs | 7 ++++--- server/src/main.rs | 1 - 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 96e0766a9..c3ddff0c4 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -54,9 +54,9 @@ pub struct Query { start_time: String, end_time: String, #[serde(default)] - send_null: bool, + pub send_null: bool, #[serde(skip)] - fields: bool, + pub fields: bool, #[serde(skip)] filter_tags: Option>, } @@ -183,7 +183,7 @@ impl FromRequest for Query { } } -async fn into_query( +pub async fn into_query( query: &Query, session_state: &SessionState, ) -> Result { diff --git a/server/src/handlers/livetail.rs b/server/src/handlers/livetail.rs index 76b6f8005..3de8426e7 100644 --- a/server/src/handlers/livetail.rs +++ b/server/src/handlers/livetail.rs @@ -231,7 +231,7 @@ pub fn server() -> impl Future Result<&str, Status> { +pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> { body.as_object() .ok_or(Status::invalid_argument("expected object in request body"))? .get("stream") @@ -240,7 +240,7 @@ fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> { .ok_or(Status::invalid_argument("stream key value is invalid")) } -fn extract_session_key(headers: &MetadataMap) -> Result { +pub fn extract_session_key(headers: &MetadataMap) -> Result { // Extract username and password from the request using basic auth extractor. let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth { username: creds.user_id, @@ -286,6 +286,7 @@ fn extract_cookie(header: &MetadataMap) -> Option { .find(|cookie| cookie.name() == SESSION_COOKIE_NAME) } -fn cross_origin_config() -> CorsLayer { +#[inline(always)] +pub fn cross_origin_config() -> CorsLayer { CorsLayer::very_permissive().allow_credentials(true) } diff --git a/server/src/main.rs b/server/src/main.rs index 04d6ed8b7..58f0a1c1c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -50,7 +50,6 @@ use crate::{ handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, }, - // localcache::LocalCacheManager, }; pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; From cc59404f53a5a616d582da0b69947c74cae5adf5 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 19 Apr 2024 12:44:54 +0530 Subject: [PATCH 02/18] impl arrow flight protocol for querying --- server/src/cli.rs | 28 +- server/src/handlers.rs | 1 + server/src/handlers/airplane.rs | 303 ++++++++++++++++++ .../src/handlers/http/modal/ingest_server.rs | 4 + .../src/handlers/http/modal/query_server.rs | 3 + server/src/handlers/http/modal/server.rs | 1 + server/src/handlers/http/query.rs | 2 +- server/src/main.rs | 6 +- 8 files changed, 338 insertions(+), 10 deletions(-) create mode 100644 server/src/handlers/airplane.rs diff --git a/server/src/cli.rs b/server/src/cli.rs index 2ad9899cd..cd3f8cf7a 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -89,6 +89,9 @@ pub struct Cli { /// public address for the parseable server ingestor pub ingestor_endpoint: String, + + /// port use by airplane(flight query service) + pub flight_port: u16, } impl Cli { @@ -118,6 +121,7 @@ impl Cli { pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; + pub const FLIGHT_PORT: &'static str = "flight-port"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -275,6 +279,16 @@ impl Cli { .value_parser(value_parser!(u16)) .help("Port for gRPC server"), ) + .arg( + Arg::new(Self::FLIGHT_PORT) + .long(Self::FLIGHT_PORT) + .env("P_FLIGHT_PORT") + .value_name("PORT") + .default_value("8002") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for Arrow Flight Querying Engine"), + ) .arg( Arg::new(Self::LIVETAIL_CAPACITY) .long(Self::LIVETAIL_CAPACITY) @@ -317,11 +331,11 @@ impl Cli { .help("Mode of operation"), ) .arg( - Arg::new(Self::INGESTOR_ENDPOINT) - .long(Self::INGESTOR_ENDPOINT) - .env("P_INGESTOR_ENDPOINT") - .value_name("URL") - .required(false) + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") + .value_name("URL") + .required(false) .help("URL to connect to this specific ingestor. Default is the address of the server.") ) .arg( @@ -401,6 +415,10 @@ impl FromArgMatches for Cli { .get_one::(Self::GRPC_PORT) .cloned() .expect("default for livetail port"); + self.flight_port = m + .get_one::(Self::FLIGHT_PORT) + .cloned() + .expect("default for flight port"); self.livetail_channel_capacity = m .get_one::(Self::LIVETAIL_CAPACITY) .cloned() diff --git a/server/src/handlers.rs b/server/src/handlers.rs index d610011cf..aa227122b 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -16,6 +16,7 @@ * */ +pub mod airplane; pub mod http; pub mod livetail; diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs new file mode 100644 index 000000000..a08062fa5 --- /dev/null +++ b/server/src/handlers/airplane.rs @@ -0,0 +1,303 @@ +use arrow_flight::flight_service_server::FlightServiceServer; +use arrow_schema::ArrowError; +use datafusion::common::tree_node::TreeNode; +use std::net::SocketAddr; +use std::sync::Arc; + +use futures_util::{Future, TryFutureExt}; + +use tonic::transport::{Identity, Server, ServerTlsConfig}; +use tonic_web::GrpcWebLayer; + +use crate::event::commit_schema; +use crate::handlers::http::fetch_schema; +use crate::option::{Mode, CONFIG}; + +use crate::handlers::livetail::cross_origin_config; + +use crate::handlers::http::query::{into_query, Query as QueryJson}; +use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::rbac::role::Permission; +use crate::storage::object_storage::commit_schema_to_storage; +use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use futures::stream::BoxStream; + +use tonic::{Request, Response, Status, Streaming}; + +use arrow_flight::{ + flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, + FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, + SchemaResult, Ticket, +}; + +use crate::handlers::livetail::extract_session_key; + +use crate::metadata::STREAM_INFO; + +use crate::rbac::role::Action as RoleAction; +use crate::rbac::Users; + +#[derive(Clone)] +pub struct AirServiceImpl {} + +#[tonic::async_trait] +impl FlightService for AirServiceImpl { + type HandshakeStream = BoxStream<'static, Result>; + type ListFlightsStream = BoxStream<'static, Result>; + type DoGetStream = BoxStream<'static, Result>; + type DoPutStream = BoxStream<'static, Result>; + type DoActionStream = BoxStream<'static, Result>; + type ListActionsStream = BoxStream<'static, Result>; + type DoExchangeStream = BoxStream<'static, Result>; + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "handshake is disabled in favour of direct authentication and authorization", + )) + } + + /// list_flights is an operation that allows a client + /// to query a Flight server for information + /// about available datasets or "flights" that the server can provide. + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement list_flights")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement get_flight_info")) + } + + async fn get_schema( + &self, + request: Request, + ) -> Result, Status> { + let table_name = request.into_inner().path; + let table_name = table_name[0].clone(); + + let schema = STREAM_INFO + .schema(&table_name) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let options = IpcWriteOptions::default(); + let schema_result = SchemaAsIpc::new(&schema, &options) + .try_into() + .map_err(|err: ArrowError| Status::internal(err.to_string()))?; + + Ok(Response::new(schema_result)) + } + + async fn do_get(&self, req: Request) -> Result, Status> { + let key = extract_session_key(req.metadata())?; + let ticket = serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string()))?; + log::info!("airplane requested for query {:?}", ticket); + + // get the query session_state + let session_state = QUERY_SESSION.state(); + + // get the logical plan and extract the table name + let raw_logical_plan = session_state + .create_logical_plan(&ticket.query) + .await + .map_err(|err| { + log::error!("Failed to create logical plan: {}", err); + Status::internal("Failed to create logical plan") + })?; + + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let table_name = visitor + .into_inner() + .pop() + .ok_or(Status::invalid_argument("No table found from sql"))?; + + if CONFIG.parseable.mode == Mode::Query { + // using http to get the schema. may update to use flight later + if let Ok(new_schema) = fetch_schema(&table_name).await { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&table_name, new_schema.clone()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + commit_schema(&table_name, Arc::new(new_schema)) + .map_err(|err| Status::internal(err.to_string()))?; + } + } + + // map payload to query + let mut query = into_query(&ticket, &session_state) + .await + .map_err(|_| Status::internal("Failed to parse query"))?; + + // if table name is not present it is a Malformed Query + let stream_name = query + .table_name() + .ok_or(Status::invalid_argument("Malformed Query"))?; + + let permissions = Users.get_permissions(&key); + + let table_name = query.table_name(); + if let Some(ref table) = table_name { + let mut authorized = false; + let mut tags = Vec::new(); + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions { + match permission { + Permission::Stream(RoleAction::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(RoleAction::Query, ref stream, tag) + if stream == table || stream == "*" => + { + authorized = true; + if let Some(tag) = tag { + tags.push(tag) + } + } + _ => (), + } + } + + if !authorized { + return Err(Status::permission_denied("User Not Authorized")); + } + + if !tags.is_empty() { + query.filter_tag = Some(tags) + } + } + + let (results, _) = query + .execute(table_name.clone().unwrap()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + let schema = STREAM_INFO + .schema(&stream_name) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); + let schema_flight_data = SchemaAsIpc::new(&schema, &options); + + let mut flights = vec![FlightData::from(schema_flight_data)]; + let encoder = IpcDataGenerator::default(); + let mut tracker = DictionaryTracker::new(false); + for batch in &results { + let (flight_dictionaries, flight_batch) = encoder + .encoded_batch(batch, &mut tracker, &options) + .map_err(|e| Status::internal(e.to_string()))?; + flights.extend(flight_dictionaries.into_iter().map(Into::into)); + flights.push(flight_batch.into()); + } + let output = futures::stream::iter(flights.into_iter().map(Ok)); + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_put not implemented because we are only using flight for querying", + )) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_action not implemented because we are only using flight for querying", + )) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "list_actions not implemented because we are only using flight for querying", + )) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_exchange not implemented because we are only using flight for querying", + )) + } +} + +pub fn server() -> impl Future>> + Send { + let mut addr: SocketAddr = CONFIG + .parseable + .address + .parse() + .expect("valid socket address"); + addr.set_port(CONFIG.parseable.flight_port); + + let service = AirServiceImpl {}; + + let svc = FlightServiceServer::new(service); + + let cors = cross_origin_config(); + + let identity = match ( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + ) { + (Some(cert), Some(key)) => { + match (std::fs::read_to_string(cert), std::fs::read_to_string(key)) { + (Ok(cert_file), Ok(key_file)) => { + let identity = Identity::from_pem(cert_file, key_file); + Some(identity) + } + _ => None, + } + } + (_, _) => None, + }; + + let config = identity.map(|id| ServerTlsConfig::new().identity(id)); + + // rust is treating closures as different types + let err_map_fn = |err| Box::new(err) as Box; + + // match on config to decide if we want to use tls or not + match config { + Some(config) => { + let server = match Server::builder().tls_config(config) { + Ok(server) => server, + Err(_) => Server::builder(), + }; + + server + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(err_map_fn) + } + None => Server::builder() + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(err_map_fn), + } +} diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index c230981af..04194189d 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,6 +18,7 @@ use crate::analytics; use crate::banner; +use crate::handlers::airplane; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; @@ -336,7 +337,10 @@ impl IngestServer { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + tokio::pin!(app); loop { tokio::select! { diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 1312b407f..607cc5653 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::handlers::airplane; use crate::handlers::http::cluster; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -183,6 +184,8 @@ impl QueryServer { analytics::init_analytics_scheduler()?; } + tokio::spawn(airplane::server()); + self.start(prometheus, CONFIG.parseable.openid.clone()) .await?; diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 72d377139..165581234 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -416,6 +416,7 @@ impl Server { } tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index c3ddff0c4..288613ba5 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -50,7 +50,7 @@ use crate::utils::actix::extract_session_key_from_req; #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { - query: String, + pub query: String, start_time: String, end_time: String, #[serde(default)] diff --git a/server/src/main.rs b/server/src/main.rs index 58f0a1c1c..95cbcb919 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -46,10 +46,8 @@ use std::sync::Arc; use handlers::http::modal::ParseableServer; use option::{Mode, CONFIG}; -use crate::{ - handlers::http::modal::{ - ingest_server::IngestServer, query_server::QueryServer, server::Server, - }, +use crate::handlers::http::modal::{ + ingest_server::IngestServer, query_server::QueryServer, server::Server, }; pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; From 80f1ca09d51dcea6c9d1d85934cea45645274d5a Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Sun, 21 Apr 2024 05:23:32 +0530 Subject: [PATCH 03/18] misc: cli updates --- server/src/banner.rs | 5 +++-- server/src/option.rs | 10 ++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/banner.rs b/server/src/banner.rs index ca665ffa4..fa817eac1 100644 --- a/server/src/banner.rs +++ b/server/src/banner.rs @@ -49,11 +49,12 @@ fn print_ascii_art() { fn status_info(config: &Config, scheme: &str, id: Uid) { let address = format!( - "\"{}://{}\" ({}), \":{}\" (gRPC)", + "\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)", scheme, config.parseable.address, scheme.to_ascii_uppercase(), - config.parseable.grpc_port + config.parseable.grpc_port, + config.parseable.flight_port ); let mut credentials = diff --git a/server/src/option.rs b/server/src/option.rs index d11df0805..e607c2062 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -44,8 +44,11 @@ impl Config { fn new() -> Self { let cli = create_parseable_cli_command() .name("Parseable") - .about("A Cloud Native, log analytics platform") - .before_help("Log Lake for the cloud-native world") + .about( + r#"A Cloud Native, log analytics platform +Log Lake for the cloud-native world +"#, + ) .arg_required_else_help(true) .subcommand_required(true) .color(clap::ColorChoice::Always) @@ -192,11 +195,10 @@ fn create_parseable_cli_command() -> Command { command!() .name("Parseable") .bin_name("parseable") - .about("Parseable is a log storage and observability platform.") .propagate_version(true) .next_line_help(false) .help_template( - r#" + r#"{name} v{version} {about} Join the community at https://logg.ing/community. From 3b656e7d8ade8ddd40f235b67f6aa45a2104298e Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 24 Apr 2024 20:31:06 +0530 Subject: [PATCH 04/18] update: fix breaking changes --- server/src/handlers/airplane.rs | 79 ++++++++++++------------------- server/src/handlers/http/query.rs | 2 +- 2 files changed, 30 insertions(+), 51 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index a08062fa5..aefbeea7c 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,4 +1,5 @@ use arrow_flight::flight_service_server::FlightServiceServer; +use arrow_flight::PollInfo; use arrow_schema::ArrowError; use datafusion::common::tree_node::TreeNode; use std::net::SocketAddr; @@ -15,9 +16,8 @@ use crate::option::{Mode, CONFIG}; use crate::handlers::livetail::cross_origin_config; -use crate::handlers::http::query::{into_query, Query as QueryJson}; +use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query, Query as QueryJson}; use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::rbac::role::Permission; use crate::storage::object_storage::commit_schema_to_storage; use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; use futures::stream::BoxStream; @@ -34,7 +34,6 @@ use crate::handlers::livetail::extract_session_key; use crate::metadata::STREAM_INFO; -use crate::rbac::role::Action as RoleAction; use crate::rbac::Users; #[derive(Clone)] @@ -69,6 +68,13 @@ impl FlightService for AirServiceImpl { Err(Status::unimplemented("Implement list_flights")) } + async fn poll_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement poll_flight_info")) + } + async fn get_flight_info( &self, _request: Request, @@ -117,20 +123,20 @@ impl FlightService for AirServiceImpl { let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); - let table_name = visitor - .into_inner() - .pop() - .ok_or(Status::invalid_argument("No table found from sql"))?; + let tables = visitor.into_inner(); if CONFIG.parseable.mode == Mode::Query { // using http to get the schema. may update to use flight later - if let Ok(new_schema) = fetch_schema(&table_name).await { - // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(&table_name, new_schema.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; - commit_schema(&table_name, Arc::new(new_schema)) - .map_err(|err| Status::internal(err.to_string()))?; + + for table in tables { + if let Ok(new_schema) = fetch_schema(&table).await { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&table, new_schema.clone()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + commit_schema(&table, Arc::new(new_schema)) + .map_err(|err| Status::internal(err.to_string()))?; + } } } @@ -141,47 +147,20 @@ impl FlightService for AirServiceImpl { // if table name is not present it is a Malformed Query let stream_name = query - .table_name() - .ok_or(Status::invalid_argument("Malformed Query"))?; + .first_table_name() + .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; let permissions = Users.get_permissions(&key); - let table_name = query.table_name(); - if let Some(ref table) = table_name { - let mut authorized = false; - let mut tags = Vec::new(); - - // in permission check if user can run query on the stream. - // also while iterating add any filter tags for this stream - for permission in permissions { - match permission { - Permission::Stream(RoleAction::All, _) => { - authorized = true; - break; - } - Permission::StreamWithTag(RoleAction::Query, ref stream, tag) - if stream == table || stream == "*" => - { - authorized = true; - if let Some(tag) = tag { - tags.push(tag) - } - } - _ => (), - } - } - - if !authorized { - return Err(Status::permission_denied("User Not Authorized")); - } - - if !tags.is_empty() { - query.filter_tag = Some(tags) - } - } + let table_name = query + .first_table_name() + .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; + authorize_and_set_filter_tags(&mut query, permissions, &table_name).map_err(|_| { + Status::permission_denied("User Does not have permission to access this") + })?; let (results, _) = query - .execute(table_name.clone().unwrap()) + .execute(table_name.clone()) .await .map_err(|err| Status::internal(err.to_string()))?; let schema = STREAM_INFO diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 288613ba5..18357abde 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -116,7 +116,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result, table_name: &str, From f6d7c55ddc0ee55ef62a5cf84c13108089228e6c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 25 Apr 2024 17:16:47 +0530 Subject: [PATCH 05/18] add staging query in airplane response(TBT) --- server/src/handlers/airplane.rs | 96 +++++++++++++++++++++++---- server/src/handlers/http/modal/mod.rs | 9 ++- server/src/handlers/http/query.rs | 8 +-- server/src/storage/staging.rs | 1 + 4 files changed, 96 insertions(+), 18 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index aefbeea7c..b5615ce99 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,16 +1,23 @@ +use arrow_array::RecordBatch; use arrow_flight::flight_service_server::FlightServiceServer; -use arrow_flight::PollInfo; +use arrow_flight::{FlightClient, PollInfo}; use arrow_schema::ArrowError; +use chrono::Utc; use datafusion::common::tree_node::TreeNode; +use futures::TryStreamExt; +use http::Uri; +use itertools::Itertools; +use serde_json::Value as JsonValue; use std::net::SocketAddr; use std::sync::Arc; use futures_util::{Future, TryFutureExt}; -use tonic::transport::{Identity, Server, ServerTlsConfig}; +use tonic::transport::{Channel, Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; use crate::event::commit_schema; +use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::fetch_schema; use crate::option::{Mode, CONFIG}; @@ -103,8 +110,27 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = serde_json::from_slice::(&req.into_inner().ticket) - .map_err(|err| Status::internal(err.to_string()))?; + + let ticket = if CONFIG.parseable.mode == Mode::Ingest { + let query = serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"] + .as_str() + .ok_or_else(|| Status::failed_precondition("query is not valid string"))? + .to_owned(); + QueryJson { + query, + send_null: false, + fields: false, + filter_tags: None, + // we can use humantime because into_query handle parseing + end_time: String::from("now"), + start_time: String::from("1min"), + } + } else { + serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string()))? + }; + log::info!("airplane requested for query {:?}", ticket); // get the query session_state @@ -144,6 +170,49 @@ impl FlightService for AirServiceImpl { let mut query = into_query(&ticket, &session_state) .await .map_err(|_| Status::internal("Failed to parse query"))?; + let time_delta = query.end - Utc::now(); + + let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 + { + let sql = ticket.query.clone(); + let ingester_metadatas = get_ingestor_info() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let mut minute_result: Vec = vec![]; + + for im in ingester_metadatas.iter() { + let mut url = im.domain_name.rsplit(":").collect_vec(); + let _ = url.pop(); + url.reverse(); + url.push(&im.flight_port); + let url = url.join(""); + let url = url + .parse::() + .map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?; + + let channel = Channel::builder(url) + .connect() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let mut client = FlightClient::new(channel); + client.add_header("authorization", &im.token)?; + + let response = client + .do_get(Ticket { + ticket: sql.clone().into(), + }) + .await?; + + let mut batches: Vec = response.try_collect().await?; + + minute_result.append(&mut batches); + } + + Some(minute_result) + } else { + None + }; // if table name is not present it is a Malformed Query let stream_name = query @@ -152,20 +221,23 @@ impl FlightService for AirServiceImpl { let permissions = Users.get_permissions(&key); - let table_name = query - .first_table_name() - .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; - authorize_and_set_filter_tags(&mut query, permissions, &table_name).map_err(|_| { + authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { Status::permission_denied("User Does not have permission to access this") })?; - let (results, _) = query - .execute(table_name.clone()) - .await - .map_err(|err| Status::internal(err.to_string()))?; let schema = STREAM_INFO .schema(&stream_name) .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let (mut results, _) = query + .execute(stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; + + if let Some(mut minute_result) = minute_result { + results.append(&mut minute_result); + } + let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_flight_data = SchemaAsIpc::new(&schema, &options); diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index edd7bd3c3..f50d8c31a 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -62,6 +62,7 @@ pub struct IngestorMetadata { pub bucket_name: String, pub token: String, pub ingestor_id: String, + pub flight_port: String, } impl IngestorMetadata { @@ -73,6 +74,7 @@ impl IngestorMetadata { username: &str, password: &str, ingestor_id: String, + flight_port: String, ) -> Self { let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); @@ -85,6 +87,7 @@ impl IngestorMetadata { bucket_name, token, ingestor_id, + flight_port, } } @@ -110,9 +113,10 @@ mod test { "admin", "admin", "ingestor_id".to_string(), + "8002".to_string(), ); - let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id"}"#).unwrap(); + let rhs = serde_json::from_slice::(br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=", "ingestor_id": "ingestor_id","flight_port": "8002"}"#).unwrap(); assert_eq!(rhs, lhs); } @@ -127,13 +131,14 @@ mod test { "admin", "admin", "ingestor_id".to_string(), + "8002".to_string(), ); let lhs = serde_json::to_string(&im) .unwrap() .try_into_bytes() .unwrap(); - let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id"}"# + let rhs = br#"{"version":"v3","port":"8000","domain_name":"https://localhost:8000","bucket_name":"somebucket","token":"Basic YWRtaW46YWRtaW4=","ingestor_id":"ingestor_id","flight_port":"8002"}"# .try_into_bytes() .unwrap(); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 18357abde..eaab9f700 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; @@ -51,14 +51,14 @@ use crate::utils::actix::extract_session_key_from_req; #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, - start_time: String, - end_time: String, + pub start_time: String, + pub end_time: String, #[serde(default)] pub send_null: bool, #[serde(skip)] pub fields: bool, #[serde(skip)] - filter_tags: Option>, + pub filter_tags: Option>, } pub async fn query(req: HttpRequest, query_request: Query) -> Result { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index abeac7062..0025db16d 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -380,6 +380,7 @@ pub fn get_ingestor_info() -> anyhow::Result { &CONFIG.parseable.username, &CONFIG.parseable.password, get_ingestor_id(), + CONFIG.parseable.flight_port.to_string(), ); put_ingestor_info(out.clone())?; From 3347b0efc674014f57b9a6fdd2a160196c85e22c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 29 Apr 2024 13:25:18 +0530 Subject: [PATCH 06/18] chore: clippy cleanup from nightly --- server/src/event/writer/file_writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 1b193eb4c..db729b8d1 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -29,6 +29,7 @@ use crate::storage::staging::StorageDir; use chrono::NaiveDateTime; pub struct ArrowWriter { + #[allow(dead_code)] pub file_path: PathBuf, pub writer: StreamWriter, } From aa473fc0e05a96598a94547aae432b078c2c9904 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 29 Apr 2024 13:26:44 +0530 Subject: [PATCH 07/18] impl flight protocol for distributed clean up clean up 2 --- server/src/handlers/airplane.rs | 98 ++++++++------------------- server/src/handlers/http/modal/mod.rs | 1 + server/src/handlers/http/query.rs | 2 +- server/src/utils/arrow.rs | 25 +++++++ server/src/utils/arrow/flight.rs | 67 ++++++++++++++++++ 5 files changed, 122 insertions(+), 71 deletions(-) create mode 100644 server/src/utils/arrow/flight.rs diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index b5615ce99..8d061810d 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,19 +1,15 @@ use arrow_array::RecordBatch; use arrow_flight::flight_service_server::FlightServiceServer; -use arrow_flight::{FlightClient, PollInfo}; -use arrow_schema::ArrowError; +use arrow_flight::PollInfo; +use arrow_schema::{ArrowError, Schema}; use chrono::Utc; use datafusion::common::tree_node::TreeNode; -use futures::TryStreamExt; -use http::Uri; -use itertools::Itertools; -use serde_json::Value as JsonValue; use std::net::SocketAddr; use std::sync::Arc; use futures_util::{Future, TryFutureExt}; -use tonic::transport::{Channel, Identity, Server, ServerTlsConfig}; +use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; use crate::event::commit_schema; @@ -23,26 +19,26 @@ use crate::option::{Mode, CONFIG}; use crate::handlers::livetail::cross_origin_config; -use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query, Query as QueryJson}; +use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::storage::object_storage::commit_schema_to_storage; -use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; -use futures::stream::BoxStream; - -use tonic::{Request, Response, Status, Streaming}; - +use crate::utils::arrow::flight::{get_query_from_ticket, run_do_get_rpc}; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; +use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use futures::stream::BoxStream; +use tonic::{Request, Response, Status, Streaming}; use crate::handlers::livetail::extract_session_key; - use crate::metadata::STREAM_INFO; - use crate::rbac::Users; +const L_CURLY: char = '{'; +const R_CURLY: char = '}'; + #[derive(Clone)] pub struct AirServiceImpl {} @@ -111,27 +107,9 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = if CONFIG.parseable.mode == Mode::Ingest { - let query = serde_json::from_slice::(&req.into_inner().ticket) - .map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"] - .as_str() - .ok_or_else(|| Status::failed_precondition("query is not valid string"))? - .to_owned(); - QueryJson { - query, - send_null: false, - fields: false, - filter_tags: None, - // we can use humantime because into_query handle parseing - end_time: String::from("now"), - start_time: String::from("1min"), - } - } else { - serde_json::from_slice::(&req.into_inner().ticket) - .map_err(|err| Status::internal(err.to_string()))? - }; + let ticket = get_query_from_ticket(req)?; - log::info!("airplane requested for query {:?}", ticket); + log::info!("query requested to airplane: {:?}", ticket); // get the query session_state let session_state = QUERY_SESSION.state(); @@ -141,7 +119,7 @@ impl FlightService for AirServiceImpl { .create_logical_plan(&ticket.query) .await .map_err(|err| { - log::error!("Failed to create logical plan: {}", err); + log::error!("Datafusion Error: Failed to create logical plan: {}", err); Status::internal("Failed to create logical plan") })?; @@ -153,7 +131,6 @@ impl FlightService for AirServiceImpl { if CONFIG.parseable.mode == Mode::Query { // using http to get the schema. may update to use flight later - for table in tables { if let Ok(new_schema) = fetch_schema(&table).await { // commit schema merges the schema internally and updates the schema in storage. @@ -170,42 +147,19 @@ impl FlightService for AirServiceImpl { let mut query = into_query(&ticket, &session_state) .await .map_err(|_| Status::internal("Failed to parse query"))?; + let time_delta = query.end - Utc::now(); let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { - let sql = ticket.query.clone(); + let sql = format!("{}\"query\": \"{}\"{}", L_CURLY, &ticket.query, R_CURLY); let ingester_metadatas = get_ingestor_info() .await .map_err(|err| Status::failed_precondition(err.to_string()))?; let mut minute_result: Vec = vec![]; - for im in ingester_metadatas.iter() { - let mut url = im.domain_name.rsplit(":").collect_vec(); - let _ = url.pop(); - url.reverse(); - url.push(&im.flight_port); - let url = url.join(""); - let url = url - .parse::() - .map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?; - - let channel = Channel::builder(url) - .connect() - .await - .map_err(|err| Status::failed_precondition(err.to_string()))?; - - let mut client = FlightClient::new(channel); - client.add_header("authorization", &im.token)?; - - let response = client - .do_get(Ticket { - ticket: sql.clone().into(), - }) - .await?; - - let mut batches: Vec = response.try_collect().await?; - + for im in ingester_metadatas { + let mut batches = run_do_get_rpc(im, sql.clone()).await?; minute_result.append(&mut batches); } @@ -225,19 +179,23 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; - let schema = STREAM_INFO - .schema(&stream_name) - .map_err(|err| Status::failed_precondition(err.to_string()))?; - let (mut results, _) = query .execute(stream_name) .await - .map_err(|err| Status::internal(err.to_string()))?; + .map_err(|err| Status::internal(err.to_string())) + .unwrap(); if let Some(mut minute_result) = minute_result { results.append(&mut minute_result); - } + }; + + let schemas = results + .iter() + .map(|batch| batch.schema()) + .map(|s| s.as_ref().clone()) + .collect::>(); + let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_flight_data = SchemaAsIpc::new(&schema, &options); diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index f50d8c31a..8af1119e3 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -66,6 +66,7 @@ pub struct IngestorMetadata { } impl IngestorMetadata { + #[allow(clippy::too_many_arguments)] pub fn new( port: String, domain_name: String, diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index eaab9f700..acfc6191b 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -19,7 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, Responder}; -use chrono::{DateTime, Duration, Utc}; +use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index 945b637fe..c8babf85d 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -24,6 +24,7 @@ use arrow_schema::Schema; use itertools::Itertools; pub mod batch_adapter; +pub mod flight; pub mod merged_reader; pub mod reverse_reader; @@ -32,6 +33,30 @@ pub use batch_adapter::adapt_batch; pub use merged_reader::MergedRecordReader; use serde_json::{Map, Value}; +/// example function for concat recordbatch(may not work) +/// use arrow::record_batch::RecordBatch; +/// use arrow::error::Result; +/// +/// fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result { +/// let schema = batch1.schema(); +/// let columns = schema +/// .fields() +/// .iter() +/// .enumerate() +/// .map(|(i, _)| -> Result<_> { +/// let array1 = batch1.column(i); +/// let array2 = batch2.column(i); +/// let array = arrow::compute::concat(&[array1.as_ref(), array2.as_ref()])?; +/// Ok(array) +/// }) +/// .collect::>>()?; +/// +/// RecordBatch::try_new(schema.clone(), columns) +/// } +/// + + + /// Replaces columns in a record batch with new arrays. /// /// # Arguments diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs new file mode 100644 index 000000000..4f8273bca --- /dev/null +++ b/server/src/utils/arrow/flight.rs @@ -0,0 +1,67 @@ +use crate::handlers::http::query::Query as QueryJson; +use crate::{ + handlers::http::modal::IngestorMetadata, + option::{Mode, CONFIG}, +}; +use arrow_array::RecordBatch; +use arrow_flight::Ticket; +use futures::TryStreamExt; +use serde_json::Value as JsonValue; +use tonic::{Request, Status}; + +use arrow_flight::FlightClient; +use http::Uri; +use tonic::transport::Channel; + +pub fn get_query_from_ticket(req: Request) -> Result { + if CONFIG.parseable.mode == Mode::Ingest { + let query = serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"] + .as_str() + .ok_or_else(|| Status::failed_precondition("query is not valid string"))? + .to_owned(); + Ok(QueryJson { + query, + send_null: false, + fields: false, + filter_tags: None, + // we can use humantime because into_query handle parseing + end_time: String::from("now"), + start_time: String::from("1min"), + }) + } else { + Ok( + serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string()))?, + ) + } +} + +pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result, Status> { + let url = im + .domain_name + .rsplit_once(':') + .ok_or(Status::failed_precondition( + "Ingester metadata is courupted", + ))? + .0; + let url = format!("{}:{}", url, im.flight_port); + let url = url + .parse::() + .map_err(|_| Status::failed_precondition("Ingester metadata is courupted"))?; + let channel = Channel::builder(url) + .connect() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let mut client = FlightClient::new(channel); + client.add_header("authorization", &im.token)?; + + let response = client + .do_get(Ticket { + ticket: sql.clone().into(), + }) + .await?; + + Ok(response.try_collect().await?) +} From 94ad5e268c6e8f2971de77a6b649085eed8eaedb Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 29 Apr 2024 17:31:10 +0530 Subject: [PATCH 08/18] impl flight protocol with staging data --- server/src/event.rs | 16 +++++++-- server/src/event/writer.rs | 50 ++++++++++++++++++++++----- server/src/event/writer/mem_writer.rs | 8 +++++ server/src/handlers/airplane.rs | 43 ++++++++++++++--------- server/src/handlers/http/ingest.rs | 19 ++++++++++ server/src/metadata.rs | 2 +- server/src/utils/arrow.rs | 2 -- 7 files changed, 110 insertions(+), 30 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index eeb95e0b0..72a8fadc3 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -48,7 +48,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(self) -> Result<(), EventError> { + pub async fn process(&self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -77,7 +77,7 @@ impl Event { crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); if let Err(e) = metadata::STREAM_INFO - .check_alerts(&self.stream_name, self.rb) + .check_alerts(&self.stream_name, &self.rb) .await { log::error!("Error checking for alerts. {:?}", e); @@ -86,6 +86,18 @@ impl Event { Ok(()) } + pub fn process_unchecked(&self) -> Result<(), EventError> { + let key = get_schema_key(&self.rb.schema().fields); + + Self::process_event(&self.stream_name, &key, self.rb.clone(), self.parsed_timestamp)?; + + Ok(()) + } + + pub fn clear(&self, stream_name: &str) { + STREAM_WRITERS.clear(stream_name); + } + // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event( diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 737a0a514..9783b4df8 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -25,8 +25,12 @@ use std::{ sync::{Arc, Mutex, RwLock}, }; +use crate::{ + option::{Mode, CONFIG}, + utils, +}; + use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; -use crate::utils; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; use chrono::NaiveDateTime; @@ -62,6 +66,12 @@ impl Writer { self.mem.push(schema_key, rb); Ok(()) } + + + fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> { + self.mem.push(schema_key, rb); + Ok(()) + } } #[derive(Deref, DerefMut, Default)] @@ -80,12 +90,19 @@ impl WriterTable { match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer.lock().unwrap().push( - stream_name, - schema_key, - record, + if CONFIG.parseable.mode != Mode::Query { + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, parsed_timestamp, )?; + } else { + stream_writer + .lock() + .unwrap() + .push_mem(stream_name, record)?; + } } None => { drop(hashmap_guard); @@ -93,12 +110,20 @@ impl WriterTable { // check for race condition // if map contains entry then just if let Some(writer) = map.get(stream_name) { - writer.lock().unwrap().push( - stream_name, - schema_key, - record, + if CONFIG.parseable.mode != Mode::Query { + writer.lock().unwrap().push( + stream_name, + schema_key, + record, parsed_timestamp, )?; + } else { + writer.lock().unwrap().push_mem(stream_name, record)?; + } + } else if CONFIG.parseable.mode != Mode::Query { + let mut writer = Writer::default(); + writer.push(stream_name, schema_key, record, parsed_timestamp)?; + map.insert(stream_name.to_owned(), Mutex::new(writer)); } else { let mut writer = Writer::default(); writer.push(stream_name, schema_key, record, parsed_timestamp)?; @@ -109,6 +134,13 @@ impl WriterTable { Ok(()) } + pub fn clear(&self, stream_name: &str) { + let map = self.write().unwrap(); + if let Some(writer) = map.get(stream_name) { + writer.lock().unwrap().mem.clear(); + } + } + pub fn delete_stream(&self, stream_name: &str) { self.write().unwrap().remove(stream_name); } diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 1f5ce4532..6369834ff 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -62,6 +62,14 @@ impl MemWriter { } } + pub fn clear(&mut self) { + self.schema = Schema::empty(); + self.schema_map.clear(); + self.read_buffer.clear(); + self.mutable_buffer.inner.clear(); + self.mutable_buffer.rows = 0; + } + pub fn recordbatch_cloned(&self, schema: &Arc) -> Vec { let mut read_buffer = self.read_buffer.clone(); if self.mutable_buffer.rows > 0 { diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 8d061810d..23f4b4ceb 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -15,6 +15,7 @@ use tonic_web::GrpcWebLayer; use crate::event::commit_schema; use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::fetch_schema; +use crate::handlers::http::ingest::push_logs_unchecked; use crate::option::{Mode, CONFIG}; use crate::handlers::livetail::cross_origin_config; @@ -148,11 +149,18 @@ impl FlightService for AirServiceImpl { .await .map_err(|_| Status::internal("Failed to parse query"))?; + // if table name is not present it is a Malformed Query + let stream_name = query + .first_table_name() + .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; + let time_delta = query.end - Utc::now(); - let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 - { - let sql = format!("{}\"query\": \"{}\"{}", L_CURLY, &ticket.query, R_CURLY); + let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { + let sql = format!( + "{}\"query\": \"select * from {}\"{}", + L_CURLY, &stream_name, R_CURLY + ); let ingester_metadatas = get_ingestor_info() .await .map_err(|err| Status::failed_precondition(err.to_string()))?; @@ -162,33 +170,31 @@ impl FlightService for AirServiceImpl { let mut batches = run_do_get_rpc(im, sql.clone()).await?; minute_result.append(&mut batches); } - - Some(minute_result) + let mut events = vec![]; + for batch in minute_result { + events.push( + push_logs_unchecked(batch, &stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?, + ); + } + Some(events) } else { None }; - // if table name is not present it is a Malformed Query - let stream_name = query - .first_table_name() - .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; - let permissions = Users.get_permissions(&key); authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { Status::permission_denied("User Does not have permission to access this") })?; - let (mut results, _) = query - .execute(stream_name) + let (results, _) = query + .execute(stream_name.clone()) .await .map_err(|err| Status::internal(err.to_string())) .unwrap(); - if let Some(mut minute_result) = minute_result { - results.append(&mut minute_result); - }; - let schemas = results .iter() .map(|batch| batch.schema()) @@ -210,6 +216,11 @@ impl FlightService for AirServiceImpl { flights.push(flight_batch.into()); } let output = futures::stream::iter(flights.into_iter().map(Ok)); + if let Some(events) = events { + for event in events { + event.clear(&stream_name); + } + } Ok(Response::new(Box::pin(output) as Self::DoGetStream)) } diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7532ad59e..8bedde495 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -33,6 +33,7 @@ use crate::storage::{LogStream, ObjectStorageError}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; +use arrow_array::RecordBatch; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::{DateTime, Utc}; @@ -40,6 +41,7 @@ use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; + // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -96,6 +98,23 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result { + todo!("timepartition fix"); + + // let event = event::Event { + // rb: batches, + // stream_name: stream_name.to_string(), + // origin_format: "json", + // origin_size: 0, + // is_first_event: true, + // }; + // event.process_unchecked()?; + // Ok(event) +} + async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> { let glob_storage = CONFIG.storage().get_object_store(); let object_store_format = glob_storage diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 4140f5156..ce8a9dad2 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -63,7 +63,7 @@ impl StreamInfo { pub async fn check_alerts( &self, stream_name: &str, - rb: RecordBatch, + rb: &RecordBatch, ) -> Result<(), CheckAlertError> { let map = self.read().expect(LOCK_EXPECT); let meta = map diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index c8babf85d..4bb9cde93 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -55,8 +55,6 @@ use serde_json::{Map, Value}; /// } /// - - /// Replaces columns in a record batch with new arrays. /// /// # Arguments From e61e4c248e6513bb73edf3a1c66a9b178068b3ac Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 30 Apr 2024 13:32:24 +0530 Subject: [PATCH 09/18] impl ingester metadata migration --- .../src/handlers/http/modal/ingest_server.rs | 14 ++++--- server/src/migration.rs | 2 +- server/src/migration/metadata_migration.rs | 38 ++++++++++++++++++- server/src/storage/object_storage.rs | 2 +- server/src/storage/staging.rs | 35 ++++++++++++----- 5 files changed, 72 insertions(+), 19 deletions(-) diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 04194189d..732949cbb 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -24,6 +24,7 @@ use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; use crate::metadata; use crate::metrics; +use crate::migration::metadata_migration::migrate_ingester_metadata; use crate::rbac; use crate::rbac::role::Action; use crate::storage; @@ -39,6 +40,10 @@ use super::IngestorMetadata; use super::OpenIdClient; use super::ParseableServer; +use crate::{ + handlers::http::{base_path, cross_origin_config}, + option::CONFIG, +}; use actix_web::body::MessageBody; use actix_web::Scope; use actix_web::{web, App, HttpServer}; @@ -50,14 +55,9 @@ use itertools::Itertools; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; -use crate::{ - handlers::http::{base_path, cross_origin_config}, - option::CONFIG, -}; - /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = - Lazy::new(|| staging::get_ingestor_info().expect("dir is readable and writeable")); + Lazy::new(|| staging::get_ingestor_info().expect("Should Be valid Json")); #[derive(Default)] pub struct IngestServer; @@ -106,6 +106,8 @@ impl ParseableServer for IngestServer { /// implement the init method will just invoke the initialize method async fn init(&self) -> anyhow::Result<()> { + migrate_ingester_metadata().await?; + self.validate()?; // check for querier state. Is it there, or was it there in the past diff --git a/server/src/migration.rs b/server/src/migration.rs index 9e7e9a3db..d9c15fc4c 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -17,7 +17,7 @@ * */ -mod metadata_migration; +pub mod metadata_migration; mod schema_migration; mod stream_metadata_migration; diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index cbeee200a..385a99e2b 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -19,7 +19,12 @@ use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; -use crate::option::CONFIG; +use crate::{ + handlers::http::modal::IngestorMetadata, + option::CONFIG, + storage::{object_storage::ingestor_metadata_path, staging}, +}; +use actix_web::body::MessageBody; /* v1 @@ -118,3 +123,34 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue { storage_metadata } + +pub async fn migrate_ingester_metadata() -> anyhow::Result<()> { + let imp = ingestor_metadata_path(None); + let bytes = CONFIG.storage().get_object_store().get_object(&imp).await?; + let mut json = serde_json::from_slice::(&bytes)?; + let meta = json + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("Unable to parse Ingester Metadata"))?; + let fp = meta.get("flight_port"); + + if fp.is_none() { + meta.insert( + "flight_port".to_owned(), + JsonValue::String(CONFIG.parseable.flight_port.to_string()), + ); + } + let bytes = serde_json::to_string(&json)? + .try_into_bytes() + .map_err(|err| anyhow::anyhow!(err))?; + + let resource: IngestorMetadata = serde_json::from_value(json)?; + staging::put_ingestor_info(resource.clone())?; + + CONFIG + .storage() + .get_object_store() + .put_object(&imp, bytes) + .await?; + + Ok(()) +} diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index b0846d0d8..c8871536e 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -536,7 +536,7 @@ pub async fn commit_schema_to_storage( } #[inline(always)] -fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { +pub fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { serde_json::to_vec(any) .map(|any| any.into()) .expect("serialize cannot fail") diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 0025db16d..bfd51a503 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -17,14 +17,6 @@ * */ -use std::{ - collections::HashMap, - fs, - path::{Path, PathBuf}, - process, - sync::Arc, -}; - use crate::{ event::DEFAULT_TIMESTAMP_KEY, handlers::http::modal::{ingest_server::INGESTOR_META, IngestorMetadata, DEFAULT_VERSION}, @@ -36,6 +28,7 @@ use crate::{ hostname_unchecked, }, }; +use anyhow::anyhow; use arrow_schema::{ArrowError, Schema}; use base64::Engine; use chrono::{NaiveDateTime, Timelike}; @@ -48,6 +41,14 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; +use serde_json::Value as JsonValue; +use std::{ + collections::HashMap, + fs, + path::{Path, PathBuf}, + process, + sync::Arc, +}; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -332,7 +333,21 @@ pub fn get_ingestor_info() -> anyhow::Result { if flag { // get the ingestor metadata from staging - let mut meta: IngestorMetadata = serde_json::from_slice(&std::fs::read(path)?)?; + let mut meta: JsonValue = serde_json::from_slice(&std::fs::read(path)?)?; + + // migrate the staging meta + let obj = meta + .as_object_mut() + .ok_or_else(|| anyhow!("Could Not parse Ingestor Metadata Json"))?; + + if obj.get("flight_port").is_none() { + obj.insert( + "flight_port".to_owned(), + JsonValue::String(CONFIG.parseable.flight_port.to_string()), + ); + } + + let mut meta: IngestorMetadata = serde_json::from_value(meta)?; // compare url endpoint and port if meta.domain_name != url { @@ -393,7 +408,7 @@ pub fn get_ingestor_info() -> anyhow::Result { /// # Parameters /// /// * `ingestor_info`: The ingestor info to be stored. -fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { +pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { let path = PathBuf::from(&CONFIG.parseable.local_staging_path); let file_name = format!("ingestor.{}.json", info.ingestor_id); let file_path = path.join(file_name); From 82ea53d9a1cd8b57d62984f51b726cb53bb60237 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 30 Apr 2024 16:15:54 +0530 Subject: [PATCH 10/18] fix: bugs causing server to not start in ingest mode --- server/src/handlers/airplane.rs | 2 +- server/src/handlers/http/cluster/utils.rs | 10 ++++++++-- server/src/handlers/http/modal/ingest_server.rs | 4 ++-- server/src/migration/metadata_migration.rs | 8 +++++++- server/src/utils/arrow/flight.rs | 3 ++- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 23f4b4ceb..4dda7f279 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -156,7 +156,7 @@ impl FlightService for AirServiceImpl { let time_delta = query.end - Utc::now(); - let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { + let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 1 { let sql = format!( "{}\"query\": \"select * from {}\"{}", L_CURLY, &stream_name, R_CURLY diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index 1caf9cd65..cea27bb04 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -16,7 +16,9 @@ * */ -use crate::handlers::http::{logstream::error::StreamError, modal::IngestorMetadata}; +use crate::handlers::http::{ + base_path_without_preceding_slash, logstream::error::StreamError, modal::IngestorMetadata, +}; use actix_web::http::header; use chrono::{DateTime, Utc}; use http::StatusCode; @@ -161,7 +163,11 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { } pub async fn check_liveness(domain_name: &str) -> bool { - let uri = match Url::parse(&format!("{}liveness", domain_name)) { + let uri = match Url::parse(&format!( + "{}{}/liveness", + domain_name, + base_path_without_preceding_slash() + )) { Ok(uri) => uri, Err(err) => { log::error!("Node Indentifier Failed To Parse: {}", err); diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 732949cbb..2ce245949 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -106,8 +106,6 @@ impl ParseableServer for IngestServer { /// implement the init method will just invoke the initialize method async fn init(&self) -> anyhow::Result<()> { - migrate_ingester_metadata().await?; - self.validate()?; // check for querier state. Is it there, or was it there in the past @@ -116,6 +114,7 @@ impl ParseableServer for IngestServer { self.validate_credentials().await?; let metadata = storage::resolve_parseable_metadata().await?; + banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); // set the info in the global metadata @@ -217,6 +216,7 @@ impl IngestServer { // create the ingestor metadata and put the .ingestor.json file in the object store async fn set_ingestor_metadata(&self) -> anyhow::Result<()> { + migrate_ingester_metadata().await?; let store = CONFIG.storage().get_object_store(); // find the meta file in staging if not generate new metadata diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 385a99e2b..72bab9db6 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -126,7 +126,13 @@ pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue { pub async fn migrate_ingester_metadata() -> anyhow::Result<()> { let imp = ingestor_metadata_path(None); - let bytes = CONFIG.storage().get_object_store().get_object(&imp).await?; + let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await { + Ok(bytes) => bytes, + Err(_) => { + log::debug!("No metadata found for ingester. So migration is not required"); + return Ok(()); + } + }; let mut json = serde_json::from_slice::(&bytes)?; let meta = json .as_object_mut() diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 4f8273bca..8d42e137c 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -15,7 +15,8 @@ use tonic::transport::Channel; pub fn get_query_from_ticket(req: Request) -> Result { if CONFIG.parseable.mode == Mode::Ingest { - let query = serde_json::from_slice::(&req.into_inner().ticket) + let inner = req.into_inner().ticket; + let query = serde_json::from_slice::(&inner) .map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"] .as_str() .ok_or_else(|| Status::failed_precondition("query is not valid string"))? From 40f588431fb725d2e6fa5423925d3147fcec2039 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 30 Apr 2024 17:00:35 +0530 Subject: [PATCH 11/18] add Query Execution metric in flight query --- server/src/handlers/airplane.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 4dda7f279..c18438bf9 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -6,6 +6,7 @@ use chrono::Utc; use datafusion::common::tree_node::TreeNode; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Instant; use futures_util::{Future, TryFutureExt}; @@ -16,6 +17,7 @@ use crate::event::commit_schema; use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::fetch_schema; use crate::handlers::http::ingest::push_logs_unchecked; +use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::handlers::livetail::cross_origin_config; @@ -189,6 +191,7 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; + let time = Instant::now(); let (results, _) = query .execute(stream_name.clone()) .await @@ -221,6 +224,12 @@ impl FlightService for AirServiceImpl { event.clear(&stream_name); } } + + let time = time.elapsed().as_secs_f64(); + QUERY_EXECUTE_TIME + .with_label_values(&["flight-query", &stream_name]) + .observe(time); + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) } From 27887f29ea99f679c09819c73fbe22f73530c6c9 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 2 May 2024 11:45:55 +0530 Subject: [PATCH 12/18] fix promethus issue --- server/src/handlers/airplane.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index c18438bf9..a6a0a1c0a 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -227,7 +227,7 @@ impl FlightService for AirServiceImpl { let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&["flight-query", &stream_name]) + .with_label_values(&[&format!("flight-query-{}", stream_name)]) .observe(time); Ok(Response::new(Box::pin(output) as Self::DoGetStream)) From 5cd498643c918d11595473a41510b6a668f7c731 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 3 May 2024 12:54:25 +0530 Subject: [PATCH 13/18] debugging --- Cargo.lock | 33 ++++++++++++++++++++----- server/Cargo.toml | 2 +- server/src/event/writer.rs | 9 ++++++- server/src/event/writer/mem_writer.rs | 6 ++--- server/src/handlers/airplane.rs | 35 +++++++++++++++++++++------ 5 files changed, 67 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd1993ed9..3c1546564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -71,7 +71,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "zstd", + "zstd 0.13.1", ] [[package]] @@ -679,8 +679,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd", - "zstd-safe", + "zstd 0.13.1", + "zstd-safe 7.1.0", ] [[package]] @@ -1399,7 +1399,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd", + "zstd 0.13.1", ] [[package]] @@ -2869,7 +2869,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.13.1", ] [[package]] @@ -4305,6 +4305,7 @@ dependencies = [ "axum", "base64 0.21.7", "bytes", + "flate2", "h2", "http 0.2.12", "http-body 0.4.6", @@ -4322,6 +4323,7 @@ dependencies = [ "tower-layer", "tower-service", "tracing", + "zstd 0.12.4", ] [[package]] @@ -5102,13 +5104,32 @@ dependencies = [ "flate2", ] +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + [[package]] name = "zstd" version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" dependencies = [ - "zstd-safe", + "zstd-safe 7.1.0", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index 6620de080..4ca31bcc6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -20,7 +20,7 @@ parquet = "51.0.0" ### LiveTail server deps arrow-flight = "51.0.0" -tonic = {version = "0.11.0", features = ["tls"] } +tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.11.0" tower-http = { version = "0.4.4", features = ["cors"] } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 9783b4df8..0a67ab382 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -137,7 +137,14 @@ impl WriterTable { pub fn clear(&self, stream_name: &str) { let map = self.write().unwrap(); if let Some(writer) = map.get(stream_name) { - writer.lock().unwrap().mem.clear(); + let w = &mut writer.lock().unwrap().mem; + dbg!(&w.read_buffer.len()); + dbg!(&w.mutable_buffer.inner.len()); + + dbg!(&w.mutable_buffer.inner); + w.clear(); + dbg!(&w.read_buffer.len()); + dbg!(&w.mutable_buffer.inner.len()); } } diff --git a/server/src/event/writer/mem_writer.rs b/server/src/event/writer/mem_writer.rs index 6369834ff..561f2c4e5 100644 --- a/server/src/event/writer/mem_writer.rs +++ b/server/src/event/writer/mem_writer.rs @@ -34,8 +34,8 @@ pub struct MemWriter { schema: Schema, // for checking uniqueness of schema schema_map: HashSet, - read_buffer: Vec, - mutable_buffer: MutableBuffer, + pub read_buffer: Vec, + pub mutable_buffer: MutableBuffer, } impl Default for MemWriter { @@ -91,7 +91,7 @@ fn concat_records(schema: &Arc, record: &[RecordBatch]) -> RecordBatch { } #[derive(Debug, Default)] -struct MutableBuffer { +pub struct MutableBuffer { pub inner: Vec, pub rows: usize, } diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index a6a0a1c0a..c04fc404b 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -2,11 +2,14 @@ use arrow_array::RecordBatch; use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::{ArrowError, Schema}; +use arrow_select::concat::concat_batches; use chrono::Utc; +use crossterm::event; use datafusion::common::tree_node::TreeNode; use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; +use tonic::codec::CompressionEncoding; use futures_util::{Future, TryFutureExt}; @@ -42,7 +45,7 @@ use crate::rbac::Users; const L_CURLY: char = '{'; const R_CURLY: char = '}'; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct AirServiceImpl {} #[tonic::async_trait] @@ -158,7 +161,7 @@ impl FlightService for AirServiceImpl { let time_delta = query.end - Utc::now(); - let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 1 { + let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { let sql = format!( "{}\"query\": \"select * from {}\"{}", L_CURLY, &stream_name, R_CURLY @@ -172,6 +175,16 @@ impl FlightService for AirServiceImpl { let mut batches = run_do_get_rpc(im, sql.clone()).await?; minute_result.append(&mut batches); } + let mr = minute_result.iter().map(|rb| rb).collect::>(); + let schema = STREAM_INFO + .schema(&stream_name) + .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; + let rb = concat_batches(&schema, mr) + .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; + + let event = push_logs_unchecked(rb, &stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; let mut events = vec![]; for batch in minute_result { events.push( @@ -180,7 +193,7 @@ impl FlightService for AirServiceImpl { .map_err(|err| Status::internal(err.to_string()))?, ); } - Some(events) + Some(event) } else { None }; @@ -220,9 +233,10 @@ impl FlightService for AirServiceImpl { } let output = futures::stream::iter(flights.into_iter().map(Ok)); if let Some(events) = events { - for event in events { - event.clear(&stream_name); - } + events.clear(&stream_name); + // for event in events { + // event.clear(&stream_name); + // } } let time = time.elapsed().as_secs_f64(); @@ -280,7 +294,12 @@ pub fn server() -> impl Future impl Future impl Future Server::builder() + .max_frame_size(16 * 1024 * 1024 - 2) .accept_http1(true) .layer(cors) .layer(GrpcWebLayer::new()) From 321b0beae4f0f81a045bd75b9a2c62dfe3eb2017 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 7 May 2024 17:49:24 +0530 Subject: [PATCH 14/18] fix: bug in sending data more than 4MB Need to update the `push_logs_unchecked` function as event processing has changed Need to clean up the `do_get` function for airplane --- server/src/event.rs | 14 +++- server/src/event/writer.rs | 99 +++++++++++++++++-------- server/src/handlers/airplane.rs | 113 ++++++++++++++++------------- server/src/handlers/http/ingest.rs | 21 +++--- server/src/handlers/http/query.rs | 3 + server/src/utils/arrow/flight.rs | 25 +++++++ 6 files changed, 178 insertions(+), 97 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 72a8fadc3..396a95a58 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; -use crate::metadata; +use crate::{handlers::http::ingest::PostError, metadata}; use chrono::NaiveDateTime; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; @@ -86,12 +86,18 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { + pub fn process_unchecked(self) -> Result { let key = get_schema_key(&self.rb.schema().fields); - Self::process_event(&self.stream_name, &key, self.rb.clone(), self.parsed_timestamp)?; + Self::process_event( + &self.stream_name, + &key, + self.rb.clone(), + self.parsed_timestamp, + ) + .map_err(PostError::Event)?; - Ok(()) + Ok(self) } pub fn clear(&self, stream_name: &str) { diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 0a67ab382..c972c628d 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -22,7 +22,7 @@ mod mem_writer; use std::{ collections::HashMap, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex, RwLock, RwLockWriteGuard}, }; use crate::{ @@ -67,7 +67,6 @@ impl Writer { Ok(()) } - fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> { self.mem.push(schema_key, rb); Ok(()) @@ -90,47 +89,89 @@ impl WriterTable { match hashmap_guard.get(stream_name) { Some(stream_writer) => { - if CONFIG.parseable.mode != Mode::Query { - stream_writer.lock().unwrap().push( - stream_name, - schema_key, - record, + self.handle_existing_writer( + stream_writer, + stream_name, + schema_key, + record, parsed_timestamp, )?; - } else { - stream_writer - .lock() - .unwrap() - .push_mem(stream_name, record)?; - } } None => { drop(hashmap_guard); - let mut map = self.write().unwrap(); + let map = self.write().unwrap(); // check for race condition // if map contains entry then just - if let Some(writer) = map.get(stream_name) { - if CONFIG.parseable.mode != Mode::Query { - writer.lock().unwrap().push( - stream_name, - schema_key, - record, + self.handle_missing_writer( + map, + stream_name, + schema_key, + record, + parsed_timestamp, + )?; + } + }; + Ok(()) + } + + fn handle_existing_writer( + &self, + stream_writer: &Mutex, + stream_name: &str, + schema_key: &str, + record: RecordBatch, + parsed_timestamp: NaiveDateTime, + ) -> Result<(), StreamWriterError> { + if CONFIG.parseable.mode != Mode::Query { + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; + } else { + stream_writer + .lock() + .unwrap() + .push_mem(stream_name, record)?; + } + + Ok(()) + } + + fn handle_missing_writer( + &self, + mut map: RwLockWriteGuard>>, + stream_name: &str, + schema_key: &str, + record: RecordBatch, + parsed_timestamp: NaiveDateTime, + ) -> Result<(), StreamWriterError> { + match map.get(stream_name) { + Some(writer) => { + if CONFIG.parseable.mode != Mode::Query { + writer.lock().unwrap().push( + stream_name, + schema_key, + record, parsed_timestamp, )?; - } else { - writer.lock().unwrap().push_mem(stream_name, record)?; - } - } else if CONFIG.parseable.mode != Mode::Query { + } else { + writer.lock().unwrap().push_mem(stream_name, record)?; + } + } + None => { + if CONFIG.parseable.mode != Mode::Query { let mut writer = Writer::default(); writer.push(stream_name, schema_key, record, parsed_timestamp)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record, parsed_timestamp)?; + writer.push_mem(schema_key, record)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } - }; + } Ok(()) } @@ -138,13 +179,7 @@ impl WriterTable { let map = self.write().unwrap(); if let Some(writer) = map.get(stream_name) { let w = &mut writer.lock().unwrap().mem; - dbg!(&w.read_buffer.len()); - dbg!(&w.mutable_buffer.inner.len()); - - dbg!(&w.mutable_buffer.inner); w.clear(); - dbg!(&w.read_buffer.len()); - dbg!(&w.mutable_buffer.inner.len()); } } diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index c04fc404b..3a729e2d2 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -1,10 +1,9 @@ use arrow_array::RecordBatch; +use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::{ArrowError, Schema}; -use arrow_select::concat::concat_batches; use chrono::Utc; -use crossterm::event; use datafusion::common::tree_node::TreeNode; use std::net::SocketAddr; use std::sync::Arc; @@ -19,7 +18,6 @@ use tonic_web::GrpcWebLayer; use crate::event::commit_schema; use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::fetch_schema; -use crate::handlers::http::ingest::push_logs_unchecked; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; @@ -28,14 +26,14 @@ use crate::handlers::livetail::cross_origin_config; use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::storage::object_storage::commit_schema_to_storage; -use crate::utils::arrow::flight::{get_query_from_ticket, run_do_get_rpc}; +use crate::utils::arrow::flight::{append_temporary_events, get_query_from_ticket, run_do_get_rpc}; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, SchemaResult, Ticket, }; -use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; -use futures::stream::BoxStream; +use arrow_ipc::writer::IpcWriteOptions; +use futures::{stream, TryStreamExt}; use tonic::{Request, Response, Status, Streaming}; use crate::handlers::livetail::extract_session_key; @@ -50,13 +48,13 @@ pub struct AirServiceImpl {} #[tonic::async_trait] impl FlightService for AirServiceImpl { - type HandshakeStream = BoxStream<'static, Result>; - type ListFlightsStream = BoxStream<'static, Result>; - type DoGetStream = BoxStream<'static, Result>; - type DoPutStream = BoxStream<'static, Result>; - type DoActionStream = BoxStream<'static, Result>; - type ListActionsStream = BoxStream<'static, Result>; - type DoExchangeStream = BoxStream<'static, Result>; + type HandshakeStream = stream::BoxStream<'static, Result>; + type ListFlightsStream = stream::BoxStream<'static, Result>; + type DoGetStream = stream::BoxStream<'static, Result>; + type DoPutStream = stream::BoxStream<'static, Result>; + type DoActionStream = stream::BoxStream<'static, Result>; + type ListActionsStream = stream::BoxStream<'static, Result>; + type DoExchangeStream = stream::BoxStream<'static, Result>; async fn handshake( &self, @@ -175,24 +173,27 @@ impl FlightService for AirServiceImpl { let mut batches = run_do_get_rpc(im, sql.clone()).await?; minute_result.append(&mut batches); } - let mr = minute_result.iter().map(|rb| rb).collect::>(); - let schema = STREAM_INFO - .schema(&stream_name) - .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; - let rb = concat_batches(&schema, mr) - .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; - - let event = push_logs_unchecked(rb, &stream_name) - .await - .map_err(|err| Status::internal(err.to_string()))?; - let mut events = vec![]; - for batch in minute_result { - events.push( - push_logs_unchecked(batch, &stream_name) - .await - .map_err(|err| Status::internal(err.to_string()))?, - ); - } + let mr = minute_result.iter().collect::>(); + dbg!(&mr.len()); + let event = append_temporary_events(&stream_name, mr).await?; + + // let schema = STREAM_INFO + // .schema(&stream_name) + // .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; + // let rb = concat_batches(&schema, mr) + // .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; + // + // let event = push_logs_unchecked(rb, &stream_name) + // .await + // .map_err(|err| Status::internal(err.to_string()))?; + // let mut events = vec![]; + // for batch in minute_result { + // events.push( + // push_logs_unchecked(batch, &stream_name) + // .await + // .map_err(|err| Status::internal(err.to_string()))?, + // ); + // } Some(event) } else { None @@ -210,28 +211,39 @@ impl FlightService for AirServiceImpl { .await .map_err(|err| Status::internal(err.to_string())) .unwrap(); + if results.len() > 1 { + dbg!(&results.len()); + } let schemas = results .iter() .map(|batch| batch.schema()) .map(|s| s.as_ref().clone()) .collect::>(); - let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; - let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); - let schema_flight_data = SchemaAsIpc::new(&schema, &options); - - let mut flights = vec![FlightData::from(schema_flight_data)]; - let encoder = IpcDataGenerator::default(); - let mut tracker = DictionaryTracker::new(false); - for batch in &results { - let (flight_dictionaries, flight_batch) = encoder - .encoded_batch(batch, &mut tracker, &options) - .map_err(|e| Status::internal(e.to_string()))?; - flights.extend(flight_dictionaries.into_iter().map(Into::into)); - flights.push(flight_batch.into()); - } - let output = futures::stream::iter(flights.into_iter().map(Ok)); + let input_stream = futures::stream::iter(results.into_iter().map(Ok)); + + let flight_data_stream = FlightDataEncoderBuilder::new() + .with_max_flight_data_size(usize::MAX) + .with_schema(schema.into()) + .build(input_stream); + + let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); + + // let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); + // let schema_flight_data = SchemaAsIpc::new(&schema, &options); + // + // let mut flights = vec![FlightData::from(schema_flight_data)]; + // let encoder = IpcDataGenerator::default(); + // let mut tracker = DictionaryTracker::new(false); + // for batch in &results { + // let (flight_dictionaries, flight_batch) = encoder + // .encoded_batch(batch, &mut tracker, &options) + // .map_err(|e| Status::internal(e.to_string()))?; + // flights.extend(flight_dictionaries.into_iter().map(Into::into)); + // flights.push(flight_batch.into()); + // } + // let output = futures::stream::iter(flights.into_iter().map(Ok)); if let Some(events) = events { events.clear(&stream_name); // for event in events { @@ -244,7 +256,9 @@ impl FlightService for AirServiceImpl { .with_label_values(&[&format!("flight-query-{}", stream_name)]) .observe(time); - Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + Ok(Response::new( + Box::pin(flight_data_stream) as Self::DoGetStream + )) } async fn do_put( @@ -297,9 +311,8 @@ pub fn server() -> impl Future Result { - todo!("timepartition fix"); - - // let event = event::Event { - // rb: batches, - // stream_name: stream_name.to_string(), - // origin_format: "json", - // origin_size: 0, - // is_first_event: true, - // }; - // event.process_unchecked()?; - // Ok(event) + event::Event { + rb: batches, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: 0, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + is_first_event: true, // NOTE: Maybe should be false + } + .process_unchecked() } async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> { diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index acfc6191b..0c40e0248 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -99,6 +99,9 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result 1 { + dbg!(&records.len()); + } let response = QueryResponse { records, fields, diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 8d42e137c..ddaea4efc 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -1,10 +1,14 @@ +use crate::event::Event; +use crate::handlers::http::ingest::push_logs_unchecked; use crate::handlers::http::query::Query as QueryJson; +use crate::metadata::STREAM_INFO; use crate::{ handlers::http::modal::IngestorMetadata, option::{Mode, CONFIG}, }; use arrow_array::RecordBatch; use arrow_flight::Ticket; +use arrow_select::concat::concat_batches; use futures::TryStreamExt; use serde_json::Value as JsonValue; use tonic::{Request, Status}; @@ -66,3 +70,24 @@ pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result, +) -> Result< + //Vec + Event, + Status, +> { + let schema = STREAM_INFO + .schema(stream_name) + .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; + let rb = concat_batches(&schema, minute_result) + .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; + + let event = push_logs_unchecked(rb, stream_name) + .await + .map_err(|err| Status::internal(err.to_string()))?; + Ok(event) +} From fe63529059b5dbfee00e7688427997207a949d1f Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 9 May 2024 16:13:27 +0530 Subject: [PATCH 15/18] arrow flight debugging --- server/src/event/writer.rs | 8 +---- server/src/handlers/airplane.rs | 56 +++++++------------------------ server/src/handlers/http/query.rs | 4 +-- server/src/utils/arrow/flight.rs | 10 +++++- 4 files changed, 23 insertions(+), 55 deletions(-) diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index c972c628d..ce0bf4372 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -102,13 +102,7 @@ impl WriterTable { let map = self.write().unwrap(); // check for race condition // if map contains entry then just - self.handle_missing_writer( - map, - stream_name, - schema_key, - record, - parsed_timestamp, - )?; + self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?; } }; Ok(()) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index 3a729e2d2..d20a1b930 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -3,6 +3,7 @@ use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::{ArrowError, Schema}; + use chrono::Utc; use datafusion::common::tree_node::TreeNode; use std::net::SocketAddr; @@ -18,6 +19,7 @@ use tonic_web::GrpcWebLayer; use crate::event::commit_schema; use crate::handlers::http::cluster::get_ingestor_info; use crate::handlers::http::fetch_schema; + use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; @@ -159,7 +161,7 @@ impl FlightService for AirServiceImpl { let time_delta = query.end - Utc::now(); - let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { + let event = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { let sql = format!( "{}\"query\": \"select * from {}\"{}", L_CURLY, &stream_name, R_CURLY @@ -174,31 +176,12 @@ impl FlightService for AirServiceImpl { minute_result.append(&mut batches); } let mr = minute_result.iter().collect::>(); - dbg!(&mr.len()); let event = append_temporary_events(&stream_name, mr).await?; - // let schema = STREAM_INFO - // .schema(&stream_name) - // .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; - // let rb = concat_batches(&schema, mr) - // .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; - // - // let event = push_logs_unchecked(rb, &stream_name) - // .await - // .map_err(|err| Status::internal(err.to_string()))?; - // let mut events = vec![]; - // for batch in minute_result { - // events.push( - // push_logs_unchecked(batch, &stream_name) - // .await - // .map_err(|err| Status::internal(err.to_string()))?, - // ); - // } Some(event) } else { None }; - let permissions = Users.get_permissions(&key); authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { @@ -211,44 +194,29 @@ impl FlightService for AirServiceImpl { .await .map_err(|err| Status::internal(err.to_string())) .unwrap(); - if results.len() > 1 { - dbg!(&results.len()); - } let schemas = results .iter() .map(|batch| batch.schema()) .map(|s| s.as_ref().clone()) .collect::>(); - let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; + let _schema = + Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; let input_stream = futures::stream::iter(results.into_iter().map(Ok)); + let write_options = IpcWriteOptions::default() + .try_with_compression(Some(arrow_ipc::CompressionType(1))) + .map_err(|err| Status::failed_precondition(err.to_string()))?; let flight_data_stream = FlightDataEncoderBuilder::new() .with_max_flight_data_size(usize::MAX) - .with_schema(schema.into()) + .with_options(write_options) + // .with_schema(schema.into()) .build(input_stream); let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string())); - // let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); - // let schema_flight_data = SchemaAsIpc::new(&schema, &options); - // - // let mut flights = vec![FlightData::from(schema_flight_data)]; - // let encoder = IpcDataGenerator::default(); - // let mut tracker = DictionaryTracker::new(false); - // for batch in &results { - // let (flight_dictionaries, flight_batch) = encoder - // .encoded_batch(batch, &mut tracker, &options) - // .map_err(|e| Status::internal(e.to_string()))?; - // flights.extend(flight_dictionaries.into_iter().map(Into::into)); - // flights.push(flight_batch.into()); - // } - // let output = futures::stream::iter(flights.into_iter().map(Ok)); - if let Some(events) = events { - events.clear(&stream_name); - // for event in events { - // event.clear(&stream_name); - // } + if let Some(event) = event { + event.clear(&stream_name); } let time = time.elapsed().as_secs_f64(); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 0c40e0248..534db8075 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -99,9 +99,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result 1 { - dbg!(&records.len()); - } + let response = QueryResponse { records, fields, diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index ddaea4efc..e23f9d10d 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -59,7 +59,15 @@ pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result Date: Thu, 9 May 2024 17:25:35 +0530 Subject: [PATCH 16/18] fix add compression features for arrow flight Client needs to have the correct decoders too(zstd) --- Cargo.lock | 8 ++++++++ server/Cargo.toml | 6 ++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c1546564..3c7099b30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,11 +545,18 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3241ce691192d789b7b94f56a10e166ee608bdc3932c759eb0b85f09235352bb" dependencies = [ + "anyhow", + "arrow-arith", "arrow-array", "arrow-buffer", "arrow-cast", + "arrow-data", "arrow-ipc", + "arrow-ord", + "arrow-row", "arrow-schema", + "arrow-select", + "arrow-string", "base64 0.22.0", "bytes", "futures", @@ -573,6 +580,7 @@ dependencies = [ "arrow-schema", "flatbuffers", "lz4_flex", + "zstd 0.13.1", ] [[package]] diff --git a/server/Cargo.toml b/server/Cargo.toml index 4ca31bcc6..01011ef5d 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -12,14 +12,12 @@ build = "build.rs" arrow-schema = { version = "51.0.0", features = ["serde"] } arrow-array = { version = "51.0.0" } arrow-json = "51.0.0" -arrow-ipc = "51.0.0" +arrow-ipc = { version = "51.0.0", features = ["zstd"] } arrow-select = "51.0.0" datafusion = "37.1.0" object_store = { version = "0.9.1", features = ["cloud", "aws"] } parquet = "51.0.0" - -### LiveTail server deps -arrow-flight = "51.0.0" +arrow-flight = { version = "51.0.0", features = ["anyhow", "tls", "arrow-arith", "arrow-data", "arrow-ord", "arrow-row", "arrow-select", "arrow-string"] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.11.0" tower-http = { version = "0.4.4", features = ["cors"] } From bc7ab708213022b79f9ab43f2778d80f9f4c1f85 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 13 May 2024 12:24:20 +0530 Subject: [PATCH 17/18] fix: issue when any ingesters are offline --- Cargo.lock | 7 ------- server/Cargo.toml | 2 +- server/src/handlers/airplane.rs | 5 +++-- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c7099b30..11f113ff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,18 +545,11 @@ version = "51.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3241ce691192d789b7b94f56a10e166ee608bdc3932c759eb0b85f09235352bb" dependencies = [ - "anyhow", - "arrow-arith", "arrow-array", "arrow-buffer", "arrow-cast", - "arrow-data", "arrow-ipc", - "arrow-ord", - "arrow-row", "arrow-schema", - "arrow-select", - "arrow-string", "base64 0.22.0", "bytes", "futures", diff --git a/server/Cargo.toml b/server/Cargo.toml index 01011ef5d..5d9eb69e2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -17,7 +17,7 @@ arrow-select = "51.0.0" datafusion = "37.1.0" object_store = { version = "0.9.1", features = ["cloud", "aws"] } parquet = "51.0.0" -arrow-flight = { version = "51.0.0", features = ["anyhow", "tls", "arrow-arith", "arrow-data", "arrow-ord", "arrow-row", "arrow-select", "arrow-string"] } +arrow-flight = { version = "51.0.0", features = [ "tls" ] } tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] } tonic-web = "0.11.0" tower-http = { version = "0.4.4", features = ["cors"] } diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index d20a1b930..b0ba8cfcb 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -172,8 +172,9 @@ impl FlightService for AirServiceImpl { let mut minute_result: Vec = vec![]; for im in ingester_metadatas { - let mut batches = run_do_get_rpc(im, sql.clone()).await?; - minute_result.append(&mut batches); + if let Ok(mut batches) = run_do_get_rpc(im, sql.clone()).await { + minute_result.append(&mut batches); + } } let mr = minute_result.iter().collect::>(); let event = append_temporary_events(&stream_name, mr).await?; From 5775e928fd74b9684139ad32c222c1ad85e26644 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 15 May 2024 15:10:38 +0530 Subject: [PATCH 18/18] fix: response when user gives end time greater than current time if the user give end time that is greater than `Utc::now()`. Response was not correct --- server/src/handlers/airplane.rs | 73 ++++++++++++---------- server/src/query.rs | 2 +- server/src/query/stream_schema_provider.rs | 2 +- server/src/utils/arrow/flight.rs | 70 +++++++++++++-------- 4 files changed, 86 insertions(+), 61 deletions(-) diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index b0ba8cfcb..8fd83f35d 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -2,10 +2,10 @@ use arrow_array::RecordBatch; use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; -use arrow_schema::{ArrowError, Schema}; +use arrow_schema::ArrowError; -use chrono::Utc; use datafusion::common::tree_node::TreeNode; +use serde_json::json; use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; @@ -28,7 +28,9 @@ use crate::handlers::livetail::cross_origin_config; use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::storage::object_storage::commit_schema_to_storage; -use crate::utils::arrow::flight::{append_temporary_events, get_query_from_ticket, run_do_get_rpc}; +use crate::utils::arrow::flight::{ + append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester, +}; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, @@ -42,9 +44,6 @@ use crate::handlers::livetail::extract_session_key; use crate::metadata::STREAM_INFO; use crate::rbac::Users; -const L_CURLY: char = '{'; -const R_CURLY: char = '}'; - #[derive(Clone, Debug)] pub struct AirServiceImpl {} @@ -159,50 +158,56 @@ impl FlightService for AirServiceImpl { .first_table_name() .ok_or_else(|| Status::invalid_argument("Malformed Query"))?; - let time_delta = query.end - Utc::now(); - - let event = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 { - let sql = format!( - "{}\"query\": \"select * from {}\"{}", - L_CURLY, &stream_name, R_CURLY - ); - let ingester_metadatas = get_ingestor_info() - .await - .map_err(|err| Status::failed_precondition(err.to_string()))?; - let mut minute_result: Vec = vec![]; - - for im in ingester_metadatas { - if let Ok(mut batches) = run_do_get_rpc(im, sql.clone()).await { - minute_result.append(&mut batches); + let event = + if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { + let sql = format!("select * from {}", &stream_name); + let start_time = ticket.start_time.clone(); + let end_time = ticket.end_time.clone(); + let out_ticket = json!({ + "query": sql, + "startTime": start_time, + "endTime": end_time + }) + .to_string(); + + let ingester_metadatas = get_ingestor_info() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let mut minute_result: Vec = vec![]; + + for im in ingester_metadatas { + if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { + minute_result.append(&mut batches); + } } - } - let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(&stream_name, mr).await?; - - Some(event) - } else { - None - }; + let mr = minute_result.iter().collect::>(); + let event = append_temporary_events(&stream_name, mr).await?; + Some(event) + } else { + None + }; let permissions = Users.get_permissions(&key); authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| { Status::permission_denied("User Does not have permission to access this") })?; - let time = Instant::now(); let (results, _) = query .execute(stream_name.clone()) .await - .map_err(|err| Status::internal(err.to_string())) - .unwrap(); + .map_err(|err| Status::internal(err.to_string()))?; + + /* + * INFO: No returning the schema with the data. + * kept it in case it needs to be sent in the future. let schemas = results .iter() .map(|batch| batch.schema()) .map(|s| s.as_ref().clone()) .collect::>(); - let _schema = - Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; + let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; + */ let input_stream = futures::stream::iter(results.into_iter().map(Ok)); let write_options = IpcWriteOptions::default() .try_with_compression(Some(arrow_ipc::CompressionType(1))) diff --git a/server/src/query.rs b/server/src/query.rs index aaac8d1cf..ce80ba2e1 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -18,7 +18,7 @@ mod filter_optimizer; mod listing_table_builder; -mod stream_schema_provider; +pub mod stream_schema_provider; use chrono::{DateTime, Utc}; use chrono::{NaiveDateTime, TimeZone}; diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 43e58ce3a..ef0eb69ba 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -607,7 +607,7 @@ fn is_overlapping_query( false } -fn include_now(filters: &[Expr], time_partition: Option) -> bool { +pub fn include_now(filters: &[Expr], time_partition: Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index e23f9d10d..113407c9d 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -2,15 +2,20 @@ use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; use crate::handlers::http::query::Query as QueryJson; use crate::metadata::STREAM_INFO; +use crate::query::stream_schema_provider::include_now; use crate::{ handlers::http::modal::IngestorMetadata, option::{Mode, CONFIG}, }; + use arrow_array::RecordBatch; use arrow_flight::Ticket; use arrow_select::concat::concat_batches; +use datafusion::logical_expr::BinaryExpr; +use datafusion::prelude::Expr; +use datafusion::scalar::ScalarValue; use futures::TryStreamExt; -use serde_json::Value as JsonValue; + use tonic::{Request, Status}; use arrow_flight::FlightClient; @@ -18,31 +23,14 @@ use http::Uri; use tonic::transport::Channel; pub fn get_query_from_ticket(req: Request) -> Result { - if CONFIG.parseable.mode == Mode::Ingest { - let inner = req.into_inner().ticket; - let query = serde_json::from_slice::(&inner) - .map_err(|_| Status::failed_precondition("Ticket is not valid json"))?["query"] - .as_str() - .ok_or_else(|| Status::failed_precondition("query is not valid string"))? - .to_owned(); - Ok(QueryJson { - query, - send_null: false, - fields: false, - filter_tags: None, - // we can use humantime because into_query handle parseing - end_time: String::from("now"), - start_time: String::from("1min"), - }) - } else { - Ok( - serde_json::from_slice::(&req.into_inner().ticket) - .map_err(|err| Status::internal(err.to_string()))?, - ) - } + serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string())) } -pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result, Status> { +pub async fn run_do_get_rpc( + im: IngestorMetadata, + ticket: String, +) -> Result, Status> { let url = im .domain_name .rsplit_once(':') @@ -72,7 +60,7 @@ pub async fn run_do_get_rpc(im: IngestorMetadata, sql: String) -> Result bool { + let filter_start = lit_timestamp_milli( + start, //query.start.timestamp_millis() + ); + let filter_end = lit_timestamp_milli( + end, //query.end.timestamp_millis() + ); + + let expr_left = Expr::Column(datafusion::common::Column { + relation: None, + name: "p_timestamp".to_owned(), + }); + + let ex1 = BinaryExpr::new( + Box::new(expr_left.clone()), + datafusion::logical_expr::Operator::Gt, + Box::new(filter_start), + ); + let ex2 = BinaryExpr::new( + Box::new(expr_left), + datafusion::logical_expr::Operator::Lt, + Box::new(filter_end), + ); + let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)]; + + CONFIG.parseable.mode == Mode::Query && include_now(&ex, None) +} + +fn lit_timestamp_milli(time: i64) -> Expr { + Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None)) +}