diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 3ffdd21a1..f44159612 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -23,6 +23,7 @@ use relative_path::RelativePathBuf; use crate::{ catalog::manifest::Manifest, + event::DEFAULT_TIMESTAMP_KEY, query::PartialTimeFilter, storage::{ObjectStorage, ObjectStorageError}, }; @@ -69,25 +70,46 @@ impl ManifestFile for manifest::File { } } -fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { - match file - .columns() - .iter() - .find(|col| col.name == "p_timestamp") - .unwrap() - .stats - .clone() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.max) - .unwrap() - .and_utc(), - ), - _ => unreachable!(), +fn get_file_bounds( + file: &manifest::File, + partition_column: String, +) -> (DateTime, DateTime) { + if partition_column == DEFAULT_TIMESTAMP_KEY { + match file + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .as_ref() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.max) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } + } else { + match file + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .as_ref() + .unwrap() + { + column::TypedStatistics::String(stats) => ( + stats.min.parse::>().unwrap(), + stats.max.parse::>().unwrap(), + ), + _ => unreachable!(), + } } } @@ -97,10 +119,19 @@ pub async fn update_snapshot( change: manifest::File, ) -> Result<(), ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; - - let (lower_bound, _) = get_file_bounds(&change); + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; + let time_partition = meta.time_partition; + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(&change, time_partition); + lower_bound + } + None => { + let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; let pos = manifests.iter().position(|item| { item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound }); @@ -109,8 +140,10 @@ pub async fn update_snapshot( // This updates an existing file so there is no need to create a snapshot entry. if let Some(pos) = pos { let info = &mut manifests[pos]; - let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - let Some(mut manifest) = storage.get_manifest(&path).await? else { + let manifest_path = + partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); + + let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else { return Err(ObjectStorageError::UnhandledError( "Manifest found in snapshot but not in object-storage" .to_string() @@ -118,7 +151,7 @@ pub async fn update_snapshot( )); }; manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; + storage.put_manifest(&manifest_path, manifest).await?; } else { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound @@ -148,7 +181,7 @@ pub async fn update_snapshot( time_upper_bound: upper_bound, }; manifests.push(new_snapshot_entriy); - storage.put_snapshot(stream_name, meta).await?; + storage.put_snapshot(stream_name, meta.snapshot).await?; } Ok(()) @@ -160,13 +193,13 @@ pub async fn remove_manifest_from_snapshot( dates: Vec, ) -> Result<(), ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); - storage.put_snapshot(stream_name, meta).await?; + storage.put_snapshot(stream_name, meta.snapshot).await?; Ok(()) } @@ -175,8 +208,8 @@ pub async fn get_first_event( stream_name: &str, ) -> Result, ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; + let mut meta = storage.get_object_store_format(stream_name).await?; + let manifests = &mut meta.snapshot.manifest_list; if manifests.is_empty() { log::info!("No manifest found for stream {stream_name}"); @@ -199,7 +232,7 @@ pub async fn get_first_event( }; if let Some(first_event) = manifest.files.first() { - let (lower_bound, _) = get_file_bounds(first_event); + let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); return Ok(Some(first_event_at)); } diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index bafed3dd5..ad5b32422 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -112,7 +112,6 @@ pub fn create_from_parquet_file( let columns = column_statistics(row_groups); manifest_file.columns = columns.into_values().collect(); let mut sort_orders = sort_order(row_groups); - if let Some(last_sort_order) = sort_orders.pop() { if sort_orders .into_iter() @@ -155,7 +154,7 @@ fn sort_order( }) .collect_vec(); - sort_orders.push(sort_order) + sort_orders.push(sort_order); } sort_orders } diff --git a/server/src/event.rs b/server/src/event.rs index 62db832bf..c517cdbda 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -26,10 +26,10 @@ use itertools::Itertools; use std::sync::Arc; -use crate::metadata; - use self::error::EventError; pub use self::writer::STREAM_WRITERS; +use crate::metadata; +use chrono::NaiveDateTime; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const DEFAULT_TAGS_KEY: &str = "p_tags"; @@ -42,6 +42,7 @@ pub struct Event { pub origin_format: &'static str, pub origin_size: u64, pub is_first_event: bool, + pub parsed_timestamp: NaiveDateTime, } // Events holds the schema related to a each event for a single log stream @@ -54,7 +55,12 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event(&self.stream_name, &key, self.rb.clone())?; + Self::process_event( + &self.stream_name, + &key, + self.rb.clone(), + self.parsed_timestamp, + )?; metadata::STREAM_INFO.update_stats( &self.stream_name, @@ -81,8 +87,9 @@ impl Event { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?; + STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?; Ok(()) } } diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 55f8e106a..cd11e440b 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -53,14 +53,14 @@ pub trait EventFormat: Sized { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); }; - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { + if get_field(&schema, DEFAULT_METADATA_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", DEFAULT_METADATA_KEY )); }; - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { + if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", DEFAULT_TIMESTAMP_KEY diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..f1befd061 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -30,7 +30,7 @@ use crate::utils; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; -use chrono::Utc; +use chrono::{NaiveDateTime, Utc}; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -48,6 +48,7 @@ impl Writer { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let rb = utils::arrow::replace_columns( rb.schema(), @@ -56,7 +57,8 @@ impl Writer { &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - self.disk.push(stream_name, schema_key, &rb)?; + self.disk + .push(stream_name, schema_key, &rb, parsed_timestamp)?; self.mem.push(schema_key, rb); Ok(()) } @@ -72,15 +74,18 @@ impl WriterTable { stream_name: &str, schema_key: &str, record: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer - .lock() - .unwrap() - .push(stream_name, schema_key, record)?; + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } None => { drop(hashmap_guard); @@ -88,13 +93,15 @@ impl WriterTable { // check for race condition // if map contains entry then just if let Some(writer) = map.get(stream_name) { - writer - .lock() - .unwrap() - .push(stream_name, schema_key, record)?; + writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record)?; + writer.push(stream_name, schema_key, record, parsed_timestamp)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 9ff62c5c3..47590a119 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -17,13 +17,13 @@ * */ -use std::collections::HashMap; -use std::fs::{File, OpenOptions}; -use std::path::PathBuf; - use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; +use chrono::NaiveDateTime; use derive_more::{Deref, DerefMut}; +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::path::PathBuf; use crate::storage::staging::StorageDir; @@ -44,27 +44,17 @@ impl FileWriter { stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { - match self.get_mut(schema_key) { - Some(writer) => { - writer - .writer - .write(record) - .map_err(StreamWriterError::Writer)?; - } - // entry is not present thus we create it - None => { - // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; - self.insert( - schema_key.to_owned(), - ArrowWriter { - file_path: path, - writer, - }, - ); - } - }; + let (path, writer) = + init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + self.insert( + schema_key.to_owned(), + ArrowWriter { + file_path: path, + writer, + }, + ); Ok(()) } @@ -80,10 +70,10 @@ fn init_new_stream_writer_file( stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key); - + let path = dir.path_by_current_time(schema_key, parsed_timestamp); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; @@ -94,6 +84,5 @@ fn init_new_stream_writer_file( stream_writer .write(record) .map_err(StreamWriterError::Writer)?; - Ok((path, stream_writer)) } diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 81beea0bd..456f5fc68 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -23,7 +23,7 @@ const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; - +const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2a3281843..1f09e9a07 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,14 +16,6 @@ * */ -use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; -use arrow_schema::Field; -use bytes::Bytes; -use http::StatusCode; -use serde_json::Value; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; - use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; @@ -32,11 +24,20 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use crate::utils::json::convert_array_to_object; +use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; +use arrow_schema::Field; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use http::StatusCode; +use serde_json::Value; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; - // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -94,45 +95,109 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb, is_first_event) = { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema - .clone(); - into_event_batch(req, body, schema)? - }; - - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event, + let glob_storage = CONFIG.storage().get_object_store(); + let object_store_format = glob_storage + .get_object_store_format(&stream_name) + .await + .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + let time_partition = object_store_format.time_partition; + let body_val: Value = serde_json::from_slice(&body)?; + let size: usize = body.len(); + let mut parsed_timestamp = Utc::now().naive_utc(); + if time_partition.is_none() { + let stream = stream_name.clone(); + let (rb, is_first_event) = get_stream_schema(stream.clone(), req, body_val)?; + event::Event { + rb, + stream_name: stream, + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + } + .process() + .await?; + } else { + let data = convert_array_to_object(body_val.clone())?; + for value in data { + let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + if body_timestamp.is_some() { + if body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .is_ok() + { + parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + + let (rb, is_first_event) = + get_stream_schema(stream_name.clone(), req.clone(), value)?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + } + .process() + .await?; + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "field {} is not in the correct datetime format", + body_timestamp.unwrap().to_owned().as_str().unwrap() + )))); + } + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "ingestion failed as field {} is not part of the log", + time_partition.unwrap() + )))); + } + } } - .process() - .await?; - Ok(()) } +fn get_stream_schema( + stream_name: String, + req: HttpRequest, + body: Value, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name))? + .schema + .clone(); + into_event_batch(req, body, schema) +} + fn into_event_batch( req: HttpRequest, - body: Bytes, + body: Value, schema: HashMap>, -) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { +) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; - let size = body.len(); - let body: Value = serde_json::from_slice(&body)?; + let event = format::json::Event { data: body, tags, metadata, }; let (rb, is_first) = event.into_recordbatch(schema)?; - Ok((size, rb, is_first)) + + Ok((rb, is_first)) } // Check if the stream exists and create a new stream if doesn't exist @@ -140,7 +205,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string()).await?; + super::logstream::create_stream(stream_name.to_string(), "").await?; Ok(()) } @@ -192,7 +257,6 @@ mod tests { types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, }; use arrow_schema::{DataType, Field}; - use bytes::Bytes; use serde_json::json; use crate::{ @@ -239,14 +303,8 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); - assert_eq!(size, 28); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( @@ -285,12 +343,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -322,8 +375,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -355,10 +407,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) - .is_err() - ); + assert!(into_event_batch(req, json, schema,).is_err()); } #[test] @@ -376,8 +425,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -389,12 +437,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .is_err()) + assert!(into_event_batch(req, json, HashMap::default(),).is_err()) } #[test] @@ -417,12 +460,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -470,12 +508,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -523,8 +556,7 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -564,12 +596,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 5); @@ -614,10 +641,7 @@ mod tests { .into_iter(), ); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) - .is_err() - ); + assert!(into_event_batch(req, json, schema,).is_err()); } #[test] @@ -645,12 +669,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e31a4d44d..20fd6330e 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,16 +23,15 @@ use actix_web::{web, HttpRequest, Responder}; use chrono::Utc; use serde_json::Value; +use self::error::{CreateStreamError, StreamError}; use crate::alerts::Alerts; +use crate::handlers::TIME_PARTITION_KEY; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::retention::Retention; -use crate::storage::{LogStream, StorageDir}; +use crate::storage::{retention::Retention, LogStream, StorageDir}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; -use self::error::{CreateStreamError, StreamError}; - pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -110,6 +109,15 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest) -> Result { + let time_partition = if let Some((_, time_partition_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == TIME_PARTITION_KEY) + { + time_partition_name.to_str().unwrap() + } else { + "" + }; let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -121,7 +129,7 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } else { - create_stream(stream_name).await?; + create_stream(stream_name, time_partition).await?; } Ok(("log stream created", StatusCode::OK)) @@ -328,13 +336,16 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> { +pub async fn create_stream( + stream_name: String, + time_partition: &str, +) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage.create_stream(&stream_name).await { + if let Err(err) = storage.create_stream(&stream_name, time_partition).await { return Err(CreateStreamError::Storage { stream_name, err }); } @@ -343,9 +354,14 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> .get_object_store() .get_stream_metadata(&stream_name) .await; - let created_at = stream_meta.unwrap().created_at; - - metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); + let stream_meta = stream_meta.unwrap(); + let created_at = stream_meta.created_at; + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + ); Ok(()) } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index af5120c49..d7896b944 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -56,7 +56,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, + pub time_partition: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -142,13 +143,18 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String, created_at: String) { + pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { created_at: if created_at.is_empty() { Local::now().to_rfc3339() } else { - created_at.clone() + created_at + }, + time_partition: if time_partition.is_empty() { + None + } else { + Some(time_partition) }, ..Default::default() }; @@ -185,6 +191,7 @@ impl StreamInfo { cache_enabled: meta.cache_enabled, created_at: meta.created_at, first_event_at: meta.first_event_at, + time_partition: meta.time_partition, }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/query.rs b/server/src/query.rs index e3f9d8dbc..120143f6a 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -43,14 +43,15 @@ use crate::option::CONFIG; use crate::storage::{ObjectStorageProvider, StorageDir}; use self::error::ExecuteError; - use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; +use crate::metadata::STREAM_INFO; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); // A query request by client +#[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, pub start: DateTime, @@ -102,9 +103,12 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute( + &self, + stream_name: String, + ) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan()) + .execute_logical_plan(self.final_logical_plan(stream_name)) .await?; let fields = df @@ -120,7 +124,7 @@ impl Query { } /// return logical plan with all time filters applied through - fn final_logical_plan(&self) -> LogicalPlan { + fn final_logical_plan(&self, stream_name: String) -> LogicalPlan { let filters = self.filter_tag.clone().and_then(tag_filter); // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat @@ -133,6 +137,7 @@ impl Query { self.start.naive_utc(), self.end.naive_utc(), filters, + stream_name, ); LogicalPlan::Explain(Explain { verbose: plan.verbose, @@ -144,7 +149,13 @@ impl Query { logical_optimization_succeeded: plan.logical_optimization_succeeded, }) } - x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters), + x => transform( + x, + self.start.naive_utc(), + self.end.naive_utc(), + filters, + stream_name, + ), } } @@ -195,33 +206,63 @@ fn transform( start_time: NaiveDateTime, end_time: NaiveDateTime, filters: Option, + stream_name: String, ) -> LogicalPlan { plan.transform(&|plan| match plan { LogicalPlan::TableScan(table) => { + let hash_map = STREAM_INFO.read().unwrap(); + let time_partition = hash_map + .get(&stream_name) + .ok_or(DataFusionError::Execution(format!( + "stream not found {}", + stream_name.clone() + )))? + .time_partition + .clone(); + let mut new_filters = vec![]; if !table_contains_any_time_filters(&table) { - let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( - start_time, - )) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - new_filters.push(start_time_filter); - new_filters.push(end_time_filter); + let mut _start_time_filter: Expr; + let mut _end_time_filter: Expr; + match time_partition { + Some(time_partition) => { + _start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + time_partition.clone(), + ))); + _end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + time_partition, + ))); + } + None => { + _start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr_default_timestamp_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + _end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr_default_timestamp_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + } + } + + new_filters.push(_start_time_filter); + new_filters.push(_end_time_filter); } if let Some(tag_filters) = filters.clone() { new_filters.push(tag_filters) } - let new_filter = new_filters.into_iter().reduce(and); - if let Some(new_filter) = new_filter { let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 874afca78..3a99b3623 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -40,7 +40,7 @@ use datafusion::{ optimizer::utils::conjunction, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, - prelude::{Column, Expr}, + prelude::Expr, scalar::ScalarValue, }; use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; @@ -287,13 +287,24 @@ impl TableProvider for StandardTableProvider { ) -> Result, DataFusionError> { let mut memory_exec = None; let mut cache_exec = None; + let object_store = state + .runtime_env() + .object_store_registry + .get_store(&self.url) + .unwrap(); + let glob_storage = CONFIG.storage().get_object_store(); - let time_filters = extract_primary_filter(filters); + let object_store_format = glob_storage + .get_object_store_format(&self.stream) + .await + .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let time_partition = object_store_format.time_partition; + let time_filters = extract_primary_filter(filters, time_partition.clone()); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters) { + if include_now(filters, time_partition) { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -306,18 +317,8 @@ impl TableProvider for StandardTableProvider { } }; - let object_store = state - .runtime_env() - .object_store_registry - .get_store(&self.url) - .unwrap(); - let glob_storage = CONFIG.storage().get_object_store(); - // Fetch snapshot - let snapshot = glob_storage - .get_snapshot(&self.stream) - .await - .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let snapshot = object_store_format.snapshot; // Is query timerange is overlapping with older data. if is_overlapping_query(&snapshot.manifest_list, &time_filters) { @@ -492,11 +493,11 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr) -> Option { + fn try_from_expr(expr: &Expr, time_partition: Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; }; - let (op, time) = extract_timestamp_bound(binexpr)?; + let (op, time) = extract_timestamp_bound(binexpr.clone(), time_partition)?; let value = match op { Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), @@ -509,7 +510,7 @@ impl PartialTimeFilter { Some(value) } - pub fn binary_expr(&self, left: Expr) -> Expr { + pub fn binary_expr_default_timestamp_key(&self, left: Expr) -> Expr { let (op, right) = match self { PartialTimeFilter::Low(Bound::Excluded(time)) => { (Operator::Gt, time.timestamp_millis()) @@ -537,6 +538,26 @@ impl PartialTimeFilter { )) } + pub fn binary_expr_timestamp_partition_key(&self, left: Expr) -> Expr { + let (op, right) = match self { + PartialTimeFilter::Low(Bound::Excluded(time)) => (Operator::Gt, time), + PartialTimeFilter::Low(Bound::Included(time)) => (Operator::GtEq, time), + PartialTimeFilter::High(Bound::Excluded(time)) => (Operator::Lt, time), + PartialTimeFilter::High(Bound::Included(time)) => (Operator::LtEq, time), + PartialTimeFilter::Eq(time) => (Operator::Eq, time), + _ => unimplemented!(), + }; + + Expr::BinaryExpr(BinaryExpr::new( + Box::new(left), + op, + Box::new(Expr::Literal(ScalarValue::Utf8(Some(format!( + "{:?}", + right + ))))), + )) + } + fn is_greater_than(&self, other: &NaiveDateTime) -> bool { match self { PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other, @@ -566,14 +587,14 @@ fn is_overlapping_query( .all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc())) } -fn include_now(filters: &[Expr]) -> bool { +fn include_now(filters: &[Expr], time_partition: Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) .expect("zeroed value is valid") .naive_utc(); - let time_filters = extract_primary_filter(filters); + let time_filters = extract_primary_filter(filters, time_partition); let upper_bound_matches = time_filters.iter().any(|filter| match filter { PartialTimeFilter::High(Bound::Excluded(time)) @@ -598,7 +619,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; - let Some((op, time)) = extract_timestamp_bound(binexpr) else { + let Some((op, time)) = extract_timestamp_bound(binexpr.clone(), None) else { return false; }; @@ -612,11 +633,22 @@ fn expr_in_boundary(filter: &Expr) -> bool { ) } -fn extract_from_lit(expr: &Expr) -> Option { - if let Expr::Literal(value) = expr { +fn extract_from_lit(expr: BinaryExpr, time_partition: Option) -> Option { + let mut column_name: String = String::default(); + if let Expr::Column(column) = *expr.left { + column_name = column.name; + } + if let Expr::Literal(value) = *expr.right { match value { ScalarValue::TimestampMillisecond(Some(value), _) => { - Some(NaiveDateTime::from_timestamp_millis(*value).unwrap()) + Some(NaiveDateTime::from_timestamp_millis(value).unwrap()) + } + ScalarValue::Utf8(Some(str_value)) => { + if time_partition.is_some() && column_name == time_partition.unwrap() { + Some(str_value.parse::().unwrap()) + } else { + None + } } _ => None, } @@ -625,14 +657,11 @@ fn extract_from_lit(expr: &Expr) -> Option { } } -fn extract_timestamp_bound(binexpr: &BinaryExpr) -> Option<(Operator, NaiveDateTime)> { - if matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == DEFAULT_TIMESTAMP_KEY) - { - let time = extract_from_lit(&binexpr.right)?; - Some((binexpr.op, time)) - } else { - None - } +fn extract_timestamp_bound( + binexpr: BinaryExpr, + time_partition: Option, +) -> Option<(Operator, NaiveDateTime)> { + Some((binexpr.op, extract_from_lit(binexpr, time_partition)?)) } async fn collect_manifest_files( @@ -658,11 +687,14 @@ async fn collect_manifest_files( } // extract start time and end time from filter preficate -fn extract_primary_filter(filters: &[Expr]) -> Vec { +fn extract_primary_filter( + filters: &[Expr], + time_partition: Option, +) -> Vec { let mut time_filters = Vec::new(); filters.iter().for_each(|expr| { let _ = expr.apply(&mut |expr| { - let time = PartialTimeFilter::try_from_expr(expr); + let time = PartialTimeFilter::try_from_expr(expr, time_partition.clone()); if let Some(time) = time { time_filters.push(time); Ok(VisitRecursion::Stop) diff --git a/server/src/storage.rs b/server/src/storage.rs index b6d17484f..5602b1984 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -82,6 +82,8 @@ pub struct ObjectStoreFormat { pub cache_enabled: bool, #[serde(skip_serializing_if = "Option::is_none")] pub retention: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -126,6 +128,7 @@ impl Default for ObjectStoreFormat { snapshot: Snapshot::default(), cache_enabled: false, retention: None, + time_partition: None, } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..9f016f5a9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -106,14 +106,22 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + async fn create_stream( + &self, + stream_name: &str, + time_partition: &str, + ) -> Result<(), ObjectStorageError> { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; + if time_partition.is_empty() { + format.time_partition = None; + } else { + format.time_partition = Some(time_partition.to_string()); + } let format_json = to_bytes(&format); - self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) .await?; @@ -292,12 +300,13 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&manifest)).await } - async fn get_snapshot(&self, stream: &str) -> Result { + async fn get_object_store_format( + &self, + stream: &str, + ) -> Result { let path = stream_json_path(stream); let bytes = self.get_object(&path).await?; - Ok(serde_json::from_slice::(&bytes) - .expect("snapshot is valid json") - .snapshot) + Ok(serde_json::from_slice::(&bytes).expect("snapshot is valid json")) } async fn put_snapshot( diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 31c5dffed..c07d0b24b 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -26,7 +26,7 @@ use std::{ }; use arrow_schema::{ArrowError, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -43,7 +43,7 @@ use crate::{ storage::OBJECT_STORE_DATA_GRANULARITY, utils::{self, arrow::merged_reader::MergedReverseRecordReader}, }; - +use rand::Rng; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -76,14 +76,19 @@ impl StorageDir { ) } - fn filename_by_current_time(stream_hash: &str) -> String { - let datetime = Utc::now(); - Self::filename_by_time(stream_hash, datetime.naive_utc()) + fn filename_by_current_time(stream_hash: &str, parsed_timestamp: NaiveDateTime) -> String { + Self::filename_by_time(stream_hash, parsed_timestamp) } - pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { - self.data_path - .join(Self::filename_by_current_time(stream_hash)) + pub fn path_by_current_time( + &self, + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + ) -> PathBuf { + self.data_path.join(Self::filename_by_current_time( + stream_hash, + parsed_timestamp, + )) } pub fn arrow_files(&self) -> Vec { @@ -157,10 +162,13 @@ impl StorageDir { } fn arrow_path_to_parquet(path: &Path) -> PathBuf { - let filename = path.file_name().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); + let file_stem = path.file_stem().unwrap().to_str().unwrap(); + let mut rng = rand::thread_rng(); + let random_number: u64 = rng.gen(); + let (_, filename) = file_stem.split_once('.').unwrap(); + let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename); + parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); parquet_path } diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index 0f18d4bf7..b448d6860 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -25,6 +25,16 @@ pub fn flatten_json_body(body: serde_json::Value) -> Result Result, anyhow::Error> { + let data = flatten_json_body(body)?; + let value_arr = match data { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => unreachable!("flatten would have failed beforehand"), + }; + Ok(value_arr) +} + pub fn convert_to_string(value: &Value) -> Value { match value { Value::Null => Value::String("null".to_owned()),