From e8d3c19a1d7b85e26578c6fdacc600aaf253f64e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 1 Mar 2024 20:18:39 +0530 Subject: [PATCH 1/9] enhancement to use timestamp for partition in ingesting logs instead of using server time --- server/src/event.rs | 8 +- server/src/event/writer.rs | 22 ++++-- server/src/event/writer/file_writer.rs | 13 ++-- server/src/handlers.rs | 4 +- server/src/handlers/http/ingest.rs | 103 +++++++++++++++++++------ server/src/handlers/http/logstream.rs | 61 +++++++++++++-- server/src/metadata.rs | 23 +++++- server/src/storage.rs | 9 +++ server/src/storage/object_storage.rs | 20 ++++- server/src/storage/staging.rs | 11 ++- 10 files changed, 214 insertions(+), 60 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 62db832bf..0b118ea08 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -27,7 +27,7 @@ use itertools::Itertools; use std::sync::Arc; use crate::metadata; - +use chrono::NaiveDateTime; use self::error::EventError; pub use self::writer::STREAM_WRITERS; @@ -42,6 +42,7 @@ pub struct Event { pub origin_format: &'static str, pub origin_size: u64, pub is_first_event: bool, + pub parsed_timestamp: NaiveDateTime, } // Events holds the schema related to a each event for a single log stream @@ -54,7 +55,7 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event(&self.stream_name, &key, self.rb.clone())?; + Self::process_event(&self.stream_name, &key, self.rb.clone(), self.parsed_timestamp)?; metadata::STREAM_INFO.update_stats( &self.stream_name, @@ -81,8 +82,9 @@ impl Event { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?; + STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?; Ok(()) } } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..6b5bbcd3c 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -30,7 +30,7 @@ use crate::utils; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; -use chrono::Utc; +use chrono::NaiveDateTime; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -48,15 +48,17 @@ impl Writer { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime ) -> Result<(), StreamWriterError> { + let rb = utils::arrow::replace_columns( rb.schema(), &rb, &[0], - &[Arc::new(get_timestamp_array(rb.num_rows()))], + &[Arc::new(get_timestamp_array(rb.num_rows(), parsed_timestamp))], ); - self.disk.push(stream_name, schema_key, &rb)?; + self.disk.push(stream_name, schema_key, &rb, parsed_timestamp)?; self.mem.push(schema_key, rb); Ok(()) } @@ -72,6 +74,7 @@ impl WriterTable { stream_name: &str, schema_key: &str, record: RecordBatch, + parsed_timestamp: NaiveDateTime ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); @@ -80,7 +83,7 @@ impl WriterTable { stream_writer .lock() .unwrap() - .push(stream_name, schema_key, record)?; + .push(stream_name, schema_key, record, parsed_timestamp)?; } None => { drop(hashmap_guard); @@ -91,10 +94,10 @@ impl WriterTable { writer .lock() .unwrap() - .push(stream_name, schema_key, record)?; + .push(stream_name, schema_key, record, parsed_timestamp)?; } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record)?; + writer.push(stream_name, schema_key, record, parsed_timestamp)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } @@ -135,8 +138,11 @@ impl WriterTable { } } -fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { - TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) +fn get_timestamp_array(size: usize, parsed_timestamp: NaiveDateTime) -> TimestampMillisecondArray { + println!("parsed_timestamp: {:?}", parsed_timestamp); + println!("parsed_timestamp: {:?}", parsed_timestamp.timestamp_millis()); + TimestampMillisecondArray::from_value(parsed_timestamp.timestamp_millis(), size) + } pub mod errors { diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 9ff62c5c3..6d8ea28a0 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::path::PathBuf; - +use chrono::NaiveDateTime; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use derive_more::{Deref, DerefMut}; @@ -44,6 +44,7 @@ impl FileWriter { stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { @@ -55,7 +56,8 @@ impl FileWriter { // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; + let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + println!("path: {:?}", path); self.insert( schema_key.to_owned(), ArrowWriter { @@ -80,10 +82,12 @@ fn init_new_stream_writer_file( stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key); - + println!("dir: {:?}", dir); + let path = dir.path_by_current_time(schema_key, parsed_timestamp); + println!("path: {:?}", path); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; @@ -94,6 +98,5 @@ fn init_new_stream_writer_file( stream_writer .write(record) .map_err(StreamWriterError::Writer)?; - Ok((path, stream_writer)) } diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 81beea0bd..bc73e0467 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -23,7 +23,9 @@ const PREFIX_TAGS: &str = "x-p-tag-"; const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; - +const TIME_PARTITION_KEY: &str = "x-p-time-partition"; +const TIME_PARTITION_FORMAT_KEY: &str = "x-p-time-partition-format"; +const TIME_PARTITION_TIMEZONE_KEY: &str = "x-p-time-partition-timezone"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2a3281843..a32f61c1f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -23,6 +23,7 @@ use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use chrono::{DateTime, Utc, NaiveDateTime}; use crate::event::error::EventError; use crate::event::format::EventFormat; @@ -36,7 +37,6 @@ use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; - // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -94,14 +94,29 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb, is_first_event) = { + let (size, rb, is_first_event, parsed_timestamp) = { let hash_map = STREAM_INFO.read().unwrap(); let schema = hash_map .get(&stream_name) .ok_or(PostError::StreamNotFound(stream_name.clone()))? .schema .clone(); - into_event_batch(req, body, schema)? + + let time_partition = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .time_partition + .clone(); + println!("time_partition: {:?}", time_partition); + + let time_partition_format = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .time_partition_format + .clone(); + println!("time_partition: {:?}", time_partition_format); + + into_event_batch(req, body, schema, time_partition, time_partition_format)? }; event::Event { @@ -110,29 +125,52 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result origin_format: "json", origin_size: size as u64, is_first_event, + parsed_timestamp, } .process() .await?; Ok(()) -} +} fn into_event_batch( req: HttpRequest, body: Bytes, schema: HashMap>, -) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { + time_partition: Option, + time_partition_format: Option, +) -> Result<(usize, arrow_array::RecordBatch, bool, NaiveDateTime ), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; let size = body.len(); let body: Value = serde_json::from_slice(&body)?; + let body_timestamp_format: &str = "%Y-%m-%dT%H:%M:%S%Z"; + let mut ingestion_prefix_timestamp = Utc::now().naive_utc(); + if time_partition.is_some() && time_partition_format.is_some(){ + let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); + if body_timestamp.is_some(){ + if body_timestamp.unwrap().to_owned().as_str().unwrap().parse::>().is_ok(){ + ingestion_prefix_timestamp = body_timestamp.unwrap().to_owned().as_str().unwrap().parse::>().unwrap().naive_utc(); + println!("ingestion_prefix_timestamp: {:?}", ingestion_prefix_timestamp); + + } + else{ + return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not in the correct format {}", body_timestamp.unwrap().to_owned().as_str().unwrap(), body_timestamp_format)))); + } + + }else{ + return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not part of the log", time_partition.unwrap())))); + } + } + println!("ingestion_prefix_timestamp: {:?}", ingestion_prefix_timestamp); let event = format::json::Event { data: body, tags, metadata, + }; let (rb, is_first) = event.into_recordbatch(schema)?; - Ok((size, rb, is_first)) + Ok((size, rb, is_first, ingestion_prefix_timestamp)) } // Check if the stream exists and create a new stream if doesn't exist @@ -140,7 +178,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string()).await?; + super::logstream::create_stream(stream_name.to_string(), String::new(), String::new(), String::new()).await?; Ok(()) } @@ -239,10 +277,12 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb, _) = into_event_batch( + let (size, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, + None, ) .unwrap(); @@ -285,10 +325,12 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( + let (_, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, + None, ) .unwrap(); @@ -322,8 +364,9 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (_, rb, _, _) = + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, + None).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -356,7 +399,8 @@ mod tests { let req = TestRequest::default().to_http_request(); assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, + None) .is_err() ); } @@ -376,8 +420,9 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (_, rb, _, _) = + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, + None).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -393,6 +438,8 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), + None, + None, ) .is_err()) } @@ -417,10 +464,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( + let (_, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), + HashMap::default(),None, + None, ) .unwrap(); @@ -470,10 +518,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( + let (_, rb, _,_) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), + HashMap::default(),None, + None, ) .unwrap(); @@ -523,8 +572,9 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap(); + let (_, rb, _,_) = + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, + None).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -564,10 +614,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( + let (_, rb, _,_) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), + HashMap::default(),None, + None, ) .unwrap(); @@ -615,7 +666,8 @@ mod tests { ); assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,) + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, + None,) .is_err() ); } @@ -645,10 +697,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( + let (_, rb, _,_) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), + HashMap::default(),None, + None, ) .unwrap(); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index e31a4d44d..311dc5e14 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -30,7 +30,7 @@ use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; - +use crate::handlers::{TIME_PARTITION_FORMAT_KEY, TIME_PARTITION_KEY, TIME_PARTITION_TIMEZONE_KEY}; use self::error::{CreateStreamError, StreamError}; pub async fn delete(req: HttpRequest) -> Result { @@ -110,6 +110,36 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest) -> Result { + let mut time_partition: String = String::new(); + let mut time_partition_format: String = String::new(); + let mut time_partition_timezone: String = String::new(); + if let Some((_, time_partition_name)) = req + .headers() + .iter() + .find(|&(key, _)| key == TIME_PARTITION_KEY) + { + time_partition = time_partition_name.to_str().unwrap().to_owned(); + println!("time_partition: {:?}", time_partition); + + if let Some((_, header_time_partition_format)) = req + .headers() + .iter() + .find(|&(key, _)| key == TIME_PARTITION_FORMAT_KEY){ + time_partition_format = header_time_partition_format.to_str().unwrap().to_owned(); + println!("time_partition_format: {:?}", time_partition_format); + }else { + return Err(StreamError::TimePartitionFormatMissing(time_partition)); + } + if let Some((_, header_time_partition_timezone)) = req + .headers() + .iter() + .find(|&(key, _)| key == TIME_PARTITION_TIMEZONE_KEY){ + time_partition_timezone = header_time_partition_timezone.to_str().unwrap().to_owned(); + println!("time_partition_timezone: {:?}", time_partition_timezone); + }else { + // return Err(StreamError::TimePartitionTimeZoneMissing(time_partition)); + } + } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -121,7 +151,7 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } else { - create_stream(stream_name).await?; + create_stream(stream_name, time_partition, time_partition_format, time_partition_timezone).await?; } Ok(("log stream created", StatusCode::OK)) @@ -328,24 +358,33 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> { +pub async fn create_stream(stream_name: String, time_partition: String, time_partition_format: String, time_partition_timezone: String) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage.create_stream(&stream_name).await { + if let Err(err) = storage.create_stream(&stream_name, &time_partition, &time_partition_format, &time_partition_timezone).await { return Err(CreateStreamError::Storage { stream_name, err }); } - let stream_meta = CONFIG + + let stream_meta: Result = CONFIG .storage() .get_object_store() .get_stream_metadata(&stream_name) .await; - let created_at = stream_meta.unwrap().created_at; - - metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); + let stream_meta = stream_meta.unwrap(); + let created_at = stream_meta.created_at; + let mut time_partition: String = String::new(); // Initialize time_partition with an empty string + let mut time_partition_format: String = String::new(); + let mut time_partition_timezone: String = String::new(); + if stream_meta.time_partition.is_some() && stream_meta.time_partition_format.is_some() && stream_meta.time_partition_timezone.is_some(){ + time_partition = stream_meta.time_partition.unwrap(); + time_partition_format = stream_meta.time_partition_format.unwrap(); + time_partition_timezone = stream_meta.time_partition_timezone.unwrap(); + } + metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at, time_partition, time_partition_format, time_partition_timezone); Ok(()) } @@ -405,6 +444,10 @@ pub mod error { InvalidRetentionConfig(serde_json::Error), #[error("{msg}")] Custom { msg: String, status: StatusCode }, + #[error("X-P-Time-Partition-Format header is missing for \"{0}\" X-P-Time-Partition header")] + TimePartitionFormatMissing(String), + #[error("X-P-Time-Partition-TimeZone header is missing for \"{0}\" X-P-Time-Partition header")] + TimePartitionTimeZoneMissing(String), } impl actix_web::ResponseError for StreamError { @@ -427,6 +470,8 @@ pub mod error { StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST, StreamError::InvalidAlertMessage(_, _) => StatusCode::BAD_REQUEST, StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST, + StreamError::TimePartitionFormatMissing(_) => StatusCode::BAD_REQUEST, + StreamError::TimePartitionTimeZoneMissing(_) => StatusCode::BAD_REQUEST } } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index e8a250719..6908c7acd 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -46,6 +46,9 @@ pub struct LogStreamMetadata { pub cache_enabled: bool, pub created_at: String, pub first_event_at: Option, + pub time_partition: Option, + pub time_partition_format: Option, + pub time_partition_timezone: Option } // It is very unlikely that panic will occur when dealing with metadata. @@ -142,7 +145,7 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String, created_at: String) { + pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String, time_partition_format: String, time_partition_timezone: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { created_at: if created_at.is_empty() { @@ -150,6 +153,21 @@ impl StreamInfo { } else { created_at.clone() }, + time_partition: if time_partition.is_empty() { + None + } else { + Some(time_partition.clone()) + }, + time_partition_format: if time_partition_format.is_empty() { + None + } else { + Some(time_partition_format.clone()) + }, + time_partition_timezone: if time_partition_timezone.is_empty() { + None + } else { + Some(time_partition_timezone.clone()) + }, ..Default::default() }; map.insert(stream_name, metadata); @@ -185,6 +203,9 @@ impl StreamInfo { cache_enabled: meta.cache_enabled, created_at: meta.created_at, first_event_at: meta.first_event_at, + time_partition: meta.time_partition, + time_partition_format: meta.time_partition_format, + time_partition_timezone: meta.time_partition_timezone }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/storage.rs b/server/src/storage.rs index b6d17484f..68cdccfee 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -82,6 +82,12 @@ pub struct ObjectStoreFormat { pub cache_enabled: bool, #[serde(skip_serializing_if = "Option::is_none")] pub retention: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition_format: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_partition_timezone: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -126,6 +132,9 @@ impl Default for ObjectStoreFormat { snapshot: Snapshot::default(), cache_enabled: false, retention: None, + time_partition: None, + time_partition_format: None, + time_partition_timezone: None, } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..3af989728 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -106,19 +106,33 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { + async fn create_stream(&self, stream_name: &str, time_partition: &str, time_partition_format: &str, time_partition_timezone: &str) -> Result<(), ObjectStorageError> { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); format.permissions = vec![permission]; - + if time_partition.is_empty() { + format.time_partition = None; + } else { + format.time_partition = Some(time_partition.to_string()); + } + if time_partition_format.is_empty() { + format.time_partition_format = None; + } else { + format.time_partition_format = Some(time_partition_format.to_string()); + } + if time_partition_timezone.is_empty() { + format.time_partition_timezone = None; + } else { + format.time_partition_timezone = Some(time_partition_timezone.to_string()); + } let format_json = to_bytes(&format); - self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) .await?; self.put_object(&stream_json_path(stream_name), format_json) .await?; + Ok(()) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 31c5dffed..5644d8828 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -26,7 +26,7 @@ use std::{ }; use arrow_schema::{ArrowError, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -76,14 +76,13 @@ impl StorageDir { ) } - fn filename_by_current_time(stream_hash: &str) -> String { - let datetime = Utc::now(); - Self::filename_by_time(stream_hash, datetime.naive_utc()) + fn filename_by_current_time(stream_hash: &str, parsed_timestamp: NaiveDateTime) -> String { + Self::filename_by_time(stream_hash, parsed_timestamp) } - pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { + pub fn path_by_current_time(&self, stream_hash: &str, parsed_timestamp: NaiveDateTime) -> PathBuf { self.data_path - .join(Self::filename_by_current_time(stream_hash)) + .join(Self::filename_by_current_time(stream_hash, parsed_timestamp)) } pub fn arrow_files(&self) -> Vec { From dcb254b5695cff2345b4b5f5f74e4dd843fb2b65 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 4 Mar 2024 15:50:18 +0530 Subject: [PATCH 2/9] enhancement to use timestamp for partition in ingesting logs instead of using server time --- server/src/catalog.rs | 68 ++++++++++++++++------ server/src/catalog/manifest.rs | 5 +- server/src/event/writer.rs | 12 ++-- server/src/event/writer/file_writer.rs | 31 +++------- server/src/handlers.rs | 2 - server/src/handlers/http/ingest.rs | 43 +++++--------- server/src/handlers/http/logstream.rs | 48 +++------------ server/src/handlers/http/query.rs | 4 +- server/src/metadata.rs | 16 +---- server/src/query.rs | 38 ++++++++---- server/src/query/stream_schema_provider.rs | 51 ++++++++++------ server/src/storage.rs | 6 -- server/src/storage/object_storage.rs | 31 +++++----- server/src/storage/staging.rs | 21 +++++-- 14 files changed, 183 insertions(+), 193 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 3ffdd21a1..a1face02c 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -22,9 +22,7 @@ use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; use crate::{ - catalog::manifest::Manifest, - query::PartialTimeFilter, - storage::{ObjectStorage, ObjectStorageError}, + catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, query::PartialTimeFilter, storage::{ObjectStorage, ObjectStorageError} }; use self::{column::Column, snapshot::ManifestItem}; @@ -69,11 +67,12 @@ impl ManifestFile for manifest::File { } } -fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { - match file +fn get_file_bounds(file: &manifest::File, partition_column: String) -> (DateTime, DateTime) { + if partition_column == DEFAULT_TIMESTAMP_KEY.to_string(){ + match file .columns() .iter() - .find(|col| col.name == "p_timestamp") + .find(|col| col.name == partition_column) .unwrap() .stats .clone() @@ -89,28 +88,56 @@ fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { ), _ => unreachable!(), } +} +else{ + match file + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .clone() + .unwrap() +{ + column::TypedStatistics::String(stats) => ( + stats.min.parse::>().unwrap(), + stats.max.parse::>().unwrap(), + ), + _ => unreachable!(), +} +} + + } pub async fn update_snapshot( storage: Arc, stream_name: &str, change: manifest::File, + manifest_file_path: &str, ) -> Result<(), ObjectStorageError> { // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; - - let (lower_bound, _) = get_file_bounds(&change); + let manifests = &mut meta.snapshot.manifest_list; + let time_partition = meta.time_partition; + let mut _lower_bound = Utc::now(); + if time_partition.is_none() { + (_lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); + } + else{ + (_lower_bound, _) = get_file_bounds(&change, time_partition.unwrap().to_string()); + } let pos = manifests.iter().position(|item| { - item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound + item.time_lower_bound <= _lower_bound && _lower_bound < item.time_upper_bound }); // We update the manifest referenced by this position // This updates an existing file so there is no need to create a snapshot entry. if let Some(pos) = pos { let info = &mut manifests[pos]; - let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - let Some(mut manifest) = storage.get_manifest(&path).await? else { + let manifest_path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); + + let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else { return Err(ObjectStorageError::UnhandledError( "Manifest found in snapshot but not in object-storage" .to_string() @@ -118,9 +145,9 @@ pub async fn update_snapshot( )); }; manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; + storage.put_manifest(&manifest_path, manifest).await?; } else { - let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let lower_bound = _lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound .date_naive() .and_time( @@ -138,6 +165,9 @@ pub async fn update_snapshot( }; let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json"); + // println!("path: {:?}", path); + // let manifest_path = relative_path::RelativePathBuf::from(manifest_file_path).join("manifest.json"); + // println!("manifest_path: {:?}", manifest_path); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) .await?; @@ -148,7 +178,7 @@ pub async fn update_snapshot( time_upper_bound: upper_bound, }; manifests.push(new_snapshot_entriy); - storage.put_snapshot(stream_name, meta).await?; + storage.put_snapshot(stream_name, meta.snapshot).await?; } Ok(()) @@ -161,12 +191,12 @@ pub async fn remove_manifest_from_snapshot( ) -> Result<(), ObjectStorageError> { // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; + let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); - storage.put_snapshot(stream_name, meta).await?; + storage.put_snapshot(stream_name, meta.snapshot).await?; Ok(()) } @@ -176,7 +206,7 @@ pub async fn get_first_event( ) -> Result, ObjectStorageError> { // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; - let manifests = &mut meta.manifest_list; + let manifests = &mut meta.snapshot.manifest_list; if manifests.is_empty() { log::info!("No manifest found for stream {stream_name}"); @@ -199,7 +229,7 @@ pub async fn get_first_event( }; if let Some(first_event) = manifest.files.first() { - let (lower_bound, _) = get_file_bounds(first_event); + let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); return Ok(Some(first_event_at)); } diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index bafed3dd5..9ecd60e9e 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -112,7 +112,6 @@ pub fn create_from_parquet_file( let columns = column_statistics(row_groups); manifest_file.columns = columns.into_values().collect(); let mut sort_orders = sort_order(row_groups); - if let Some(last_sort_order) = sort_orders.pop() { if sort_orders .into_iter() @@ -131,6 +130,7 @@ fn sort_order( let mut sort_orders = Vec::new(); for row_group in row_groups { let sort_order = row_group.sorting_columns().unwrap(); + println!("sort_order: {:?}", sort_order); let sort_order = sort_order .iter() .map(|sort_order| { @@ -155,7 +155,8 @@ fn sort_order( }) .collect_vec(); - sort_orders.push(sort_order) + sort_orders.push(sort_order); + } sort_orders } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 6b5bbcd3c..1dcb5bc9c 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -30,7 +30,7 @@ use crate::utils; use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; -use chrono::NaiveDateTime; +use chrono::{NaiveDateTime, Utc}; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -55,9 +55,9 @@ impl Writer { rb.schema(), &rb, &[0], - &[Arc::new(get_timestamp_array(rb.num_rows(), parsed_timestamp))], + &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - + self.disk.push(stream_name, schema_key, &rb, parsed_timestamp)?; self.mem.push(schema_key, rb); Ok(()) @@ -138,10 +138,8 @@ impl WriterTable { } } -fn get_timestamp_array(size: usize, parsed_timestamp: NaiveDateTime) -> TimestampMillisecondArray { - println!("parsed_timestamp: {:?}", parsed_timestamp); - println!("parsed_timestamp: {:?}", parsed_timestamp.timestamp_millis()); - TimestampMillisecondArray::from_value(parsed_timestamp.timestamp_millis(), size) +fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { + TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 6d8ea28a0..d976734dc 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -46,27 +46,14 @@ impl FileWriter { record: &RecordBatch, parsed_timestamp: NaiveDateTime ) -> Result<(), StreamWriterError> { - match self.get_mut(schema_key) { - Some(writer) => { - writer - .writer - .write(record) - .map_err(StreamWriterError::Writer)?; - } - // entry is not present thus we create it - None => { - // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; - println!("path: {:?}", path); - self.insert( - schema_key.to_owned(), - ArrowWriter { - file_path: path, - writer, - }, - ); - } - }; + let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + self.insert( + schema_key.to_owned(), + ArrowWriter { + file_path: path, + writer, + }, + ); Ok(()) } @@ -85,9 +72,7 @@ fn init_new_stream_writer_file( parsed_timestamp: NaiveDateTime, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - println!("dir: {:?}", dir); let path = dir.path_by_current_time(schema_key, parsed_timestamp); - println!("path: {:?}", path); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; diff --git a/server/src/handlers.rs b/server/src/handlers.rs index bc73e0467..456f5fc68 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -24,8 +24,6 @@ const PREFIX_META: &str = "x-p-meta-"; const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; -const TIME_PARTITION_FORMAT_KEY: &str = "x-p-time-partition-format"; -const TIME_PARTITION_TIMEZONE_KEY: &str = "x-p-time-partition-timezone"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index a32f61c1f..c475147fa 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -107,16 +107,8 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .ok_or(PostError::StreamNotFound(stream_name.clone()))? .time_partition .clone(); - println!("time_partition: {:?}", time_partition); - - let time_partition_format = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .time_partition_format - .clone(); - println!("time_partition: {:?}", time_partition_format); - into_event_batch(req, body, schema, time_partition, time_partition_format)? + into_event_batch(req, body, schema, time_partition)? }; event::Event { @@ -138,31 +130,27 @@ fn into_event_batch( body: Bytes, schema: HashMap>, time_partition: Option, - time_partition_format: Option, ) -> Result<(usize, arrow_array::RecordBatch, bool, NaiveDateTime ), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; let size = body.len(); let body: Value = serde_json::from_slice(&body)?; - let body_timestamp_format: &str = "%Y-%m-%dT%H:%M:%S%Z"; let mut ingestion_prefix_timestamp = Utc::now().naive_utc(); - if time_partition.is_some() && time_partition_format.is_some(){ + if time_partition.is_some(){ let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); if body_timestamp.is_some(){ if body_timestamp.unwrap().to_owned().as_str().unwrap().parse::>().is_ok(){ ingestion_prefix_timestamp = body_timestamp.unwrap().to_owned().as_str().unwrap().parse::>().unwrap().naive_utc(); - println!("ingestion_prefix_timestamp: {:?}", ingestion_prefix_timestamp); } else{ - return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not in the correct format {}", body_timestamp.unwrap().to_owned().as_str().unwrap(), body_timestamp_format)))); + return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not in the correct format", body_timestamp.unwrap().to_owned().as_str().unwrap())))); } }else{ return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not part of the log", time_partition.unwrap())))); } } - println!("ingestion_prefix_timestamp: {:?}", ingestion_prefix_timestamp); let event = format::json::Event { data: body, tags, @@ -178,7 +166,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string(), String::new(), String::new(), String::new()).await?; + super::logstream::create_stream(stream_name.to_string(), String::new()).await?; Ok(()) } @@ -282,7 +270,6 @@ mod tests { Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), None, - None, ) .unwrap(); @@ -330,7 +317,6 @@ mod tests { Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), None, - None, ) .unwrap(); @@ -365,8 +351,7 @@ mod tests { let req = TestRequest::default().to_http_request(); let (_, rb, _, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - None).unwrap(); + into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None,).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -400,7 +385,7 @@ mod tests { assert!( into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - None) + ) .is_err() ); } @@ -422,7 +407,7 @@ mod tests { let (_, rb, _, _) = into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - None).unwrap(); + ).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -439,7 +424,7 @@ mod tests { Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), None, - None, + ) .is_err()) } @@ -468,7 +453,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(),None, - None, + ) .unwrap(); @@ -522,7 +507,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(),None, - None, + ) .unwrap(); @@ -574,7 +559,7 @@ mod tests { let (_, rb, _,_) = into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - None).unwrap(); + ).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -618,7 +603,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(),None, - None, + ) .unwrap(); @@ -667,7 +652,7 @@ mod tests { assert!( into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - None,) + ) .is_err() ); } @@ -701,7 +686,7 @@ mod tests { req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(),None, - None, + ) .unwrap(); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 311dc5e14..7689754c9 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -30,7 +30,7 @@ use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; -use crate::handlers::{TIME_PARTITION_FORMAT_KEY, TIME_PARTITION_KEY, TIME_PARTITION_TIMEZONE_KEY}; +use crate::handlers::TIME_PARTITION_KEY; use self::error::{CreateStreamError, StreamError}; pub async fn delete(req: HttpRequest) -> Result { @@ -111,34 +111,13 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest) -> Result { let mut time_partition: String = String::new(); - let mut time_partition_format: String = String::new(); - let mut time_partition_timezone: String = String::new(); if let Some((_, time_partition_name)) = req .headers() .iter() .find(|&(key, _)| key == TIME_PARTITION_KEY) { time_partition = time_partition_name.to_str().unwrap().to_owned(); - println!("time_partition: {:?}", time_partition); - - if let Some((_, header_time_partition_format)) = req - .headers() - .iter() - .find(|&(key, _)| key == TIME_PARTITION_FORMAT_KEY){ - time_partition_format = header_time_partition_format.to_str().unwrap().to_owned(); - println!("time_partition_format: {:?}", time_partition_format); - }else { - return Err(StreamError::TimePartitionFormatMissing(time_partition)); - } - if let Some((_, header_time_partition_timezone)) = req - .headers() - .iter() - .find(|&(key, _)| key == TIME_PARTITION_TIMEZONE_KEY){ - time_partition_timezone = header_time_partition_timezone.to_str().unwrap().to_owned(); - println!("time_partition_timezone: {:?}", time_partition_timezone); - }else { - // return Err(StreamError::TimePartitionTimeZoneMissing(time_partition)); - } + } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -151,7 +130,7 @@ pub async fn put_stream(req: HttpRequest) -> Result status: StatusCode::BAD_REQUEST, }); } else { - create_stream(stream_name, time_partition, time_partition_format, time_partition_timezone).await?; + create_stream(stream_name, time_partition).await?; } Ok(("log stream created", StatusCode::OK)) @@ -358,13 +337,13 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn create_stream(stream_name: String, time_partition: String, time_partition_format: String, time_partition_timezone: String) -> Result<(), CreateStreamError> { +pub async fn create_stream(stream_name: String, time_partition: String) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage.create_stream(&stream_name, &time_partition, &time_partition_format, &time_partition_timezone).await { + if let Err(err) = storage.create_stream(&stream_name, &time_partition).await { return Err(CreateStreamError::Storage { stream_name, err }); } @@ -376,15 +355,8 @@ pub async fn create_stream(stream_name: String, time_partition: String, time_par .await; let stream_meta = stream_meta.unwrap(); let created_at = stream_meta.created_at; - let mut time_partition: String = String::new(); // Initialize time_partition with an empty string - let mut time_partition_format: String = String::new(); - let mut time_partition_timezone: String = String::new(); - if stream_meta.time_partition.is_some() && stream_meta.time_partition_format.is_some() && stream_meta.time_partition_timezone.is_some(){ - time_partition = stream_meta.time_partition.unwrap(); - time_partition_format = stream_meta.time_partition_format.unwrap(); - time_partition_timezone = stream_meta.time_partition_timezone.unwrap(); - } - metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at, time_partition, time_partition_format, time_partition_timezone); + + metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at, time_partition); Ok(()) } @@ -444,10 +416,6 @@ pub mod error { InvalidRetentionConfig(serde_json::Error), #[error("{msg}")] Custom { msg: String, status: StatusCode }, - #[error("X-P-Time-Partition-Format header is missing for \"{0}\" X-P-Time-Partition header")] - TimePartitionFormatMissing(String), - #[error("X-P-Time-Partition-TimeZone header is missing for \"{0}\" X-P-Time-Partition header")] - TimePartitionTimeZoneMissing(String), } impl actix_web::ResponseError for StreamError { @@ -470,8 +438,6 @@ pub mod error { StreamError::InvalidAlert(_) => StatusCode::BAD_REQUEST, StreamError::InvalidAlertMessage(_, _) => StatusCode::BAD_REQUEST, StreamError::InvalidRetentionConfig(_) => StatusCode::BAD_REQUEST, - StreamError::TimePartitionFormatMissing(_) => StatusCode::BAD_REQUEST, - StreamError::TimePartitionTimeZoneMissing(_) => StatusCode::BAD_REQUEST } } diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index af5120c49..16b24b69b 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -56,7 +56,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result, pub time_partition: Option, - pub time_partition_format: Option, - pub time_partition_timezone: Option } // It is very unlikely that panic will occur when dealing with metadata. @@ -145,7 +143,7 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String, time_partition_format: String, time_partition_timezone: String) { + pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { created_at: if created_at.is_empty() { @@ -158,16 +156,6 @@ impl StreamInfo { } else { Some(time_partition.clone()) }, - time_partition_format: if time_partition_format.is_empty() { - None - } else { - Some(time_partition_format.clone()) - }, - time_partition_timezone: if time_partition_timezone.is_empty() { - None - } else { - Some(time_partition_timezone.clone()) - }, ..Default::default() }; map.insert(stream_name, metadata); @@ -204,8 +192,6 @@ impl StreamInfo { created_at: meta.created_at, first_event_at: meta.first_event_at, time_partition: meta.time_partition, - time_partition_format: meta.time_partition_format, - time_partition_timezone: meta.time_partition_timezone }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/query.rs b/server/src/query.rs index e3f9d8dbc..899545c6c 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -43,7 +43,7 @@ use crate::option::CONFIG; use crate::storage::{ObjectStorageProvider, StorageDir}; use self::error::ExecuteError; - +use crate::metadata::STREAM_INFO; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; @@ -51,6 +51,7 @@ pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); // A query request by client +#[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, pub start: DateTime, @@ -102,10 +103,11 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute(&self, stream_name: String) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan()) + .execute_logical_plan(self.final_logical_plan(stream_name)) .await?; + println!("df: {:?}", df); let fields = df .schema() @@ -120,8 +122,11 @@ impl Query { } /// return logical plan with all time filters applied through - fn final_logical_plan(&self) -> LogicalPlan { + fn final_logical_plan(&self, stream_name: String) -> LogicalPlan { let filters = self.filter_tag.clone().and_then(tag_filter); + println!("filters: {:?}", filters); + println!("start time: {:?}", self.start.naive_utc()); + println!("end time: {:?}", self.end.naive_utc()); // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -133,7 +138,9 @@ impl Query { self.start.naive_utc(), self.end.naive_utc(), filters, + stream_name ); + println!("transformed : {:?}", transformed); LogicalPlan::Explain(Explain { verbose: plan.verbose, stringified_plans: vec![ @@ -144,7 +151,7 @@ impl Query { logical_optimization_succeeded: plan.logical_optimization_succeeded, }) } - x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters), + x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters, stream_name), } } @@ -195,22 +202,33 @@ fn transform( start_time: NaiveDateTime, end_time: NaiveDateTime, filters: Option, + stream_name: String ) -> LogicalPlan { plan.transform(&|plan| match plan { LogicalPlan::TableScan(table) => { + let hash_map = STREAM_INFO.read().unwrap(); + // let time_partition = hash_map + // .get(&stream_name) + // .ok_or(PostError::StreamNotFound(stream_name.clone()))? + // .time_partition + // .clone(); + let mut new_filters = vec![]; if !table_contains_any_time_filters(&table) { - let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( + + let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( start_time, )) .binary_expr(Expr::Column(Column::new( Some(table.table_name.to_owned_reference()), - event::DEFAULT_TIMESTAMP_KEY, + // event::DEFAULT_TIMESTAMP_KEY, + "timestamp", ))); let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) .binary_expr(Expr::Column(Column::new( Some(table.table_name.to_owned_reference()), - event::DEFAULT_TIMESTAMP_KEY, + // event::DEFAULT_TIMESTAMP_KEY, + "timestamp", ))); new_filters.push(start_time_filter); new_filters.push(end_time_filter); @@ -219,9 +237,9 @@ fn transform( if let Some(tag_filters) = filters.clone() { new_filters.push(tag_filters) } - + println!("new_filters: {:?}", new_filters); let new_filter = new_filters.into_iter().reduce(and); - + println!("new_filter: {:?}", new_filter); if let Some(new_filter) = new_filter { let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 874afca78..f3eead41b 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -287,8 +287,9 @@ impl TableProvider for StandardTableProvider { ) -> Result, DataFusionError> { let mut memory_exec = None; let mut cache_exec = None; - + println!("filters: {:?}", filters); let time_filters = extract_primary_filter(filters); + println!("time_filters: {:?}", time_filters); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } @@ -314,11 +315,13 @@ impl TableProvider for StandardTableProvider { let glob_storage = CONFIG.storage().get_object_store(); // Fetch snapshot - let snapshot = glob_storage + let object_store_format = glob_storage .get_snapshot(&self.stream) .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let snapshot = object_store_format.snapshot; + // Is query timerange is overlapping with older data. if is_overlapping_query(&snapshot.manifest_list, &time_filters) { return legacy_listing_table( @@ -512,27 +515,27 @@ impl PartialTimeFilter { pub fn binary_expr(&self, left: Expr) -> Expr { let (op, right) = match self { PartialTimeFilter::Low(Bound::Excluded(time)) => { - (Operator::Gt, time.timestamp_millis()) + (Operator::Gt, time) } PartialTimeFilter::Low(Bound::Included(time)) => { - (Operator::GtEq, time.timestamp_millis()) + (Operator::GtEq, time) } PartialTimeFilter::High(Bound::Excluded(time)) => { - (Operator::Lt, time.timestamp_millis()) + (Operator::Lt, time) } PartialTimeFilter::High(Bound::Included(time)) => { - (Operator::LtEq, time.timestamp_millis()) + (Operator::LtEq, time) } - PartialTimeFilter::Eq(time) => (Operator::Eq, time.timestamp_millis()), + PartialTimeFilter::Eq(time) => (Operator::Eq, time), _ => unimplemented!(), }; + Expr::BinaryExpr(BinaryExpr::new( Box::new(left), op, - Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( - Some(right), - None, + Box::new(Expr::Literal(ScalarValue::Utf8( + Some(format!("{:?}", right)), ))), )) } @@ -613,11 +616,18 @@ fn expr_in_boundary(filter: &Expr) -> bool { } fn extract_from_lit(expr: &Expr) -> Option { + println!("expr: {:?}", expr); if let Expr::Literal(value) = expr { + println!("value: {:?}", value); match value { ScalarValue::TimestampMillisecond(Some(value), _) => { Some(NaiveDateTime::from_timestamp_millis(*value).unwrap()) + }, + ScalarValue::Utf8(Some(str_value)) => { + println!("str_value: {:?}", str_value); + Some(str_value.parse::().unwrap()) } + _ => None, } } else { @@ -626,13 +636,20 @@ fn extract_from_lit(expr: &Expr) -> Option { } fn extract_timestamp_bound(binexpr: &BinaryExpr) -> Option<(Operator, NaiveDateTime)> { - if matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == DEFAULT_TIMESTAMP_KEY) - { - let time = extract_from_lit(&binexpr.right)?; - Some((binexpr.op, time)) - } else { - None - } + println!("binexpr: {:?}", binexpr); + println!("binexpr.left: {:?}", binexpr.left); + println!("binexpr.right: {:?}", binexpr.right); + + let time = extract_from_lit(&binexpr.right)?; + println!("time: {:?}", time); + Some((binexpr.op, time)) + // if matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == DEFAULT_TIMESTAMP_KEY) + // { + // let time = extract_from_lit(&binexpr.right)?; + // Some((binexpr.op, time)) + // } else { + // None + // } } async fn collect_manifest_files( diff --git a/server/src/storage.rs b/server/src/storage.rs index 68cdccfee..5602b1984 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -84,10 +84,6 @@ pub struct ObjectStoreFormat { pub retention: Option, #[serde(skip_serializing_if = "Option::is_none")] pub time_partition: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub time_partition_format: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub time_partition_timezone: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -133,8 +129,6 @@ impl Default for ObjectStoreFormat { cache_enabled: false, retention: None, time_partition: None, - time_partition_format: None, - time_partition_timezone: None, } } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 3af989728..7e58e9072 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -106,7 +106,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn create_stream(&self, stream_name: &str, time_partition: &str, time_partition_format: &str, time_partition_timezone: &str) -> Result<(), ObjectStorageError> { + async fn create_stream(&self, stream_name: &str, time_partition: &str) -> Result<(), ObjectStorageError> { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); @@ -116,16 +116,7 @@ pub trait ObjectStorage: Sync + 'static { } else { format.time_partition = Some(time_partition.to_string()); } - if time_partition_format.is_empty() { - format.time_partition_format = None; - } else { - format.time_partition_format = Some(time_partition_format.to_string()); - } - if time_partition_timezone.is_empty() { - format.time_partition_timezone = None; - } else { - format.time_partition_timezone = Some(time_partition_timezone.to_string()); - } + let format_json = to_bytes(&format); self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) .await?; @@ -283,6 +274,7 @@ pub trait ObjectStorage: Sync + 'static { path: &RelativePath, ) -> Result, ObjectStorageError> { let path = manifest_path(path.as_str()); + println!("path: {:?}", path); match self.get_object(&path).await { Ok(bytes) => Ok(Some( serde_json::from_slice(&bytes).expect("manifest is valid json"), @@ -306,12 +298,13 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&manifest)).await } - async fn get_snapshot(&self, stream: &str) -> Result { + + + async fn get_snapshot(&self, stream: &str) -> Result { let path = stream_json_path(stream); let bytes = self.get_object(&path).await?; Ok(serde_json::from_slice::(&bytes) - .expect("snapshot is valid json") - .snapshot) + .expect("snapshot is valid json")) } async fn put_snapshot( @@ -369,10 +362,11 @@ pub trait ObjectStorage: Sync + 'static { let absolute_path = self .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) .to_string(); + let manifest_file_path = get_stream_manifest_path(stream_relative_path); let store = CONFIG.storage().get_object_store(); let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); - catalog::update_snapshot(store, stream, manifest).await?; + catalog::update_snapshot(store, stream, manifest,&manifest_file_path).await?; if cache_enabled && cache_manager.is_some() { cache_updates .entry(stream) @@ -418,6 +412,7 @@ pub trait ObjectStorage: Sync + 'static { } } + async fn commit_schema_to_storage( stream_name: &str, schema: Schema, @@ -459,3 +454,9 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { fn manifest_path(prefix: &str) -> RelativePathBuf { RelativePathBuf::from_iter([prefix, MANIFEST_FILE]) } + +fn get_stream_manifest_path(stream_relative_path: String) -> String{ + let v: Vec<&str> = stream_relative_path.split("/").collect(); + let manifest_path = format!("{}/{}", v[0], v[1]); + manifest_path +} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 5644d8828..5413ce5fd 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -43,7 +43,7 @@ use crate::{ storage::OBJECT_STORE_DATA_GRANULARITY, utils::{self, arrow::merged_reader::MergedReverseRecordReader}, }; - +use rand::Rng; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -65,7 +65,15 @@ impl StorageDir { + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); let hostname = utils::hostname_unchecked(); - format!("{local_uri}{hostname}.{extention}") + if extention == PARQUET_FILE_EXTENSION { + let mut rng = rand::thread_rng(); + let n: u64 = rng.gen(); + format!("{local_uri}{hostname}{n}.{extention}") + } + else{ + format!("{local_uri}{hostname}.{extention}") + } + } fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { @@ -156,10 +164,13 @@ impl StorageDir { } fn arrow_path_to_parquet(path: &Path) -> PathBuf { - let filename = path.file_name().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); + let file_stem = path.file_stem().unwrap().to_str().unwrap(); + let mut rng = rand::thread_rng(); + let n: u64 = rng.gen(); + let (_, filename) = file_stem.split_once('.').unwrap(); + let filename_with_random = format!("{}.{}.{}",filename, n, "arrows"); let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename); + parquet_path.set_file_name(filename_with_random); parquet_path.set_extension("parquet"); parquet_path } From 05ac02735a2deb7a1a5e219f1338b8739747f828 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 4 Mar 2024 23:51:46 +0530 Subject: [PATCH 3/9] add custom header X-P-Time-Partition (optional) at stream creation api -- to allow ingestion/query using timestamp column from the log data instead of server time / p_timestamp store the time_partition field name in stream.json and in memory STREAM_INFO in ingest api - check if timestamp column name exists in the log data, if no, throw exception also, check if timestamp value can be parsed into datetime, if no, throw exception arrow file name gets the date, hr, mm from the timestamp field (if defined in stream) else file name gets the date, hr, mm from the server time parquet file name gets a random number attached to it -- this is because a lot of log data can have same date, hr, mm value of the timestamp field and with this random number, parquet will not get overwritten in the console, query from and to date will be matched against the value of the timestamp column of the log data (if defined in the stream), else from and to date will be matched against the p_timestamp column --- server/src/catalog.rs | 101 +++++++++-------- server/src/catalog/manifest.rs | 2 - server/src/event.rs | 11 +- server/src/event/format.rs | 4 +- server/src/event/writer.rs | 31 ++--- server/src/event/writer/file_writer.rs | 13 ++- server/src/handlers/http/ingest.rs | 125 +++++++++++++-------- server/src/handlers/http/logstream.rs | 27 ++--- server/src/handlers/http/query.rs | 1 - server/src/metadata.rs | 4 +- server/src/query.rs | 93 +++++++++------ server/src/query/stream_schema_provider.rs | 65 +++++------ server/src/storage/object_storage.rs | 30 ++--- server/src/storage/staging.rs | 28 +++-- 14 files changed, 295 insertions(+), 240 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index a1face02c..c9502d1ef 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -22,7 +22,10 @@ use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; use crate::{ - catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, query::PartialTimeFilter, storage::{ObjectStorage, ObjectStorageError} + catalog::manifest::Manifest, + event::DEFAULT_TIMESTAMP_KEY, + query::PartialTimeFilter, + storage::{ObjectStorage, ObjectStorageError}, }; use self::{column::Column, snapshot::ManifestItem}; @@ -67,64 +70,62 @@ impl ManifestFile for manifest::File { } } -fn get_file_bounds(file: &manifest::File, partition_column: String) -> (DateTime, DateTime) { - if partition_column == DEFAULT_TIMESTAMP_KEY.to_string(){ +fn get_file_bounds( + file: &manifest::File, + partition_column: String, +) -> (DateTime, DateTime) { + if partition_column == *DEFAULT_TIMESTAMP_KEY.to_string() { match file - .columns() - .iter() - .find(|col| col.name == partition_column) - .unwrap() - .stats - .clone() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.max) - .unwrap() - .and_utc(), - ), - _ => unreachable!(), + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .clone() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.max) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } + } else { + match file + .columns() + .iter() + .find(|col| col.name == partition_column) + .unwrap() + .stats + .clone() + .unwrap() + { + column::TypedStatistics::String(stats) => ( + stats.min.parse::>().unwrap(), + stats.max.parse::>().unwrap(), + ), + _ => unreachable!(), + } } -} -else{ - match file - .columns() - .iter() - .find(|col| col.name == partition_column) - .unwrap() - .stats - .clone() - .unwrap() -{ - column::TypedStatistics::String(stats) => ( - stats.min.parse::>().unwrap(), - stats.max.parse::>().unwrap(), - ), - _ => unreachable!(), -} -} - - } pub async fn update_snapshot( storage: Arc, stream_name: &str, change: manifest::File, - manifest_file_path: &str, ) -> Result<(), ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; + let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; let time_partition = meta.time_partition; let mut _lower_bound = Utc::now(); if time_partition.is_none() { (_lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); - } - else{ + } else { (_lower_bound, _) = get_file_bounds(&change, time_partition.unwrap().to_string()); } let pos = manifests.iter().position(|item| { @@ -135,8 +136,9 @@ pub async fn update_snapshot( // This updates an existing file so there is no need to create a snapshot entry. if let Some(pos) = pos { let info = &mut manifests[pos]; - let manifest_path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - + let manifest_path = + partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); + let Some(mut manifest) = storage.get_manifest(&manifest_path).await? else { return Err(ObjectStorageError::UnhandledError( "Manifest found in snapshot but not in object-storage" @@ -165,9 +167,6 @@ pub async fn update_snapshot( }; let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json"); - // println!("path: {:?}", path); - // let manifest_path = relative_path::RelativePathBuf::from(manifest_file_path).join("manifest.json"); - // println!("manifest_path: {:?}", manifest_path); storage .put_object(&path, serde_json::to_vec(&manifest).unwrap().into()) .await?; @@ -190,7 +189,7 @@ pub async fn remove_manifest_from_snapshot( dates: Vec, ) -> Result<(), ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; + let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete @@ -205,7 +204,7 @@ pub async fn get_first_event( stream_name: &str, ) -> Result, ObjectStorageError> { // get current snapshot - let mut meta = storage.get_snapshot(stream_name).await?; + let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; if manifests.is_empty() { diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index 9ecd60e9e..ad5b32422 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -130,7 +130,6 @@ fn sort_order( let mut sort_orders = Vec::new(); for row_group in row_groups { let sort_order = row_group.sorting_columns().unwrap(); - println!("sort_order: {:?}", sort_order); let sort_order = sort_order .iter() .map(|sort_order| { @@ -156,7 +155,6 @@ fn sort_order( .collect_vec(); sort_orders.push(sort_order); - } sort_orders } diff --git a/server/src/event.rs b/server/src/event.rs index 0b118ea08..c517cdbda 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -26,10 +26,10 @@ use itertools::Itertools; use std::sync::Arc; -use crate::metadata; -use chrono::NaiveDateTime; use self::error::EventError; pub use self::writer::STREAM_WRITERS; +use crate::metadata; +use chrono::NaiveDateTime; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const DEFAULT_TAGS_KEY: &str = "p_tags"; @@ -55,7 +55,12 @@ impl Event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event(&self.stream_name, &key, self.rb.clone(), self.parsed_timestamp)?; + Self::process_event( + &self.stream_name, + &key, + self.rb.clone(), + self.parsed_timestamp, + )?; metadata::STREAM_INFO.update_stats( &self.stream_name, diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 55f8e106a..cd11e440b 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -53,14 +53,14 @@ pub trait EventFormat: Sized { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); }; - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { + if get_field(&schema, DEFAULT_METADATA_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", DEFAULT_METADATA_KEY )); }; - if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { + if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( "field {} is a reserved field", DEFAULT_TIMESTAMP_KEY diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 1dcb5bc9c..f1befd061 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -48,17 +48,17 @@ impl Writer { stream_name: &str, schema_key: &str, rb: RecordBatch, - parsed_timestamp: NaiveDateTime + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { - let rb = utils::arrow::replace_columns( rb.schema(), &rb, &[0], &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - - self.disk.push(stream_name, schema_key, &rb, parsed_timestamp)?; + + self.disk + .push(stream_name, schema_key, &rb, parsed_timestamp)?; self.mem.push(schema_key, rb); Ok(()) } @@ -74,16 +74,18 @@ impl WriterTable { stream_name: &str, schema_key: &str, record: RecordBatch, - parsed_timestamp: NaiveDateTime + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer - .lock() - .unwrap() - .push(stream_name, schema_key, record, parsed_timestamp)?; + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } None => { drop(hashmap_guard); @@ -91,10 +93,12 @@ impl WriterTable { // check for race condition // if map contains entry then just if let Some(writer) = map.get(stream_name) { - writer - .lock() - .unwrap() - .push(stream_name, schema_key, record, parsed_timestamp)?; + writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } else { let mut writer = Writer::default(); writer.push(stream_name, schema_key, record, parsed_timestamp)?; @@ -140,7 +144,6 @@ impl WriterTable { fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { TimestampMillisecondArray::from_value(Utc::now().timestamp_millis(), size) - } pub mod errors { diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index d976734dc..47590a119 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -17,13 +17,13 @@ * */ -use std::collections::HashMap; -use std::fs::{File, OpenOptions}; -use std::path::PathBuf; -use chrono::NaiveDateTime; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; +use chrono::NaiveDateTime; use derive_more::{Deref, DerefMut}; +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::path::PathBuf; use crate::storage::staging::StorageDir; @@ -44,9 +44,10 @@ impl FileWriter { stream_name: &str, schema_key: &str, record: &RecordBatch, - parsed_timestamp: NaiveDateTime + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + let (path, writer) = + init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; self.insert( schema_key.to_owned(), ArrowWriter { diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index c475147fa..633c493b1 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -19,11 +19,11 @@ use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::Field; use bytes::Bytes; +use chrono::{DateTime, NaiveDateTime, Utc}; use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use chrono::{DateTime, Utc, NaiveDateTime}; use crate::event::error::EventError; use crate::event::format::EventFormat; @@ -107,7 +107,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .ok_or(PostError::StreamNotFound(stream_name.clone()))? .time_partition .clone(); - + into_event_batch(req, body, schema, time_partition)? }; @@ -123,39 +123,56 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .await?; Ok(()) -} +} fn into_event_batch( req: HttpRequest, body: Bytes, schema: HashMap>, time_partition: Option, -) -> Result<(usize, arrow_array::RecordBatch, bool, NaiveDateTime ), PostError> { +) -> Result<(usize, arrow_array::RecordBatch, bool, NaiveDateTime), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; let size = body.len(); let body: Value = serde_json::from_slice(&body)?; let mut ingestion_prefix_timestamp = Utc::now().naive_utc(); - if time_partition.is_some(){ + + if time_partition.is_some() { let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); - if body_timestamp.is_some(){ - if body_timestamp.unwrap().to_owned().as_str().unwrap().parse::>().is_ok(){ - ingestion_prefix_timestamp = body_timestamp.unwrap().to_owned().as_str().unwrap().parse::>().unwrap().naive_utc(); - - } - else{ - return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not in the correct format", body_timestamp.unwrap().to_owned().as_str().unwrap())))); + if body_timestamp.is_some() { + if body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .is_ok() + { + ingestion_prefix_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "field {} is not in the correct datetime format", + body_timestamp.unwrap().to_owned().as_str().unwrap() + )))); } - - }else{ - return Err(PostError::Invalid(anyhow::Error::msg(format!("field {} is not part of the log", time_partition.unwrap())))); + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "ingestion failed as field {} is not part of the log", + time_partition.unwrap() + )))); } } let event = format::json::Event { data: body, tags, metadata, - }; let (rb, is_first) = event.into_recordbatch(schema)?; Ok((size, rb, is_first, ingestion_prefix_timestamp)) @@ -350,8 +367,13 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None,).unwrap(); + let (_, rb, _, _) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -383,11 +405,13 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - ) - .is_err() - ); + assert!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .is_err()); } #[test] @@ -405,9 +429,13 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - ).unwrap(); + let (_, rb, _, _) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -424,7 +452,6 @@ mod tests { Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), None, - ) .is_err()) } @@ -452,8 +479,8 @@ mod tests { let (_, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(),None, - + HashMap::default(), + None, ) .unwrap(); @@ -503,11 +530,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _,_) = into_event_batch( + let (_, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(),None, - + HashMap::default(), + None, ) .unwrap(); @@ -557,9 +584,13 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _,_) = - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - ).unwrap(); + let (_, rb, _, _) = into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -599,11 +630,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _,_) = into_event_batch( + let (_, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(),None, - + HashMap::default(), + None, ) .unwrap(); @@ -650,11 +681,13 @@ mod tests { .into_iter(), ); - assert!( - into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,None, - ) - .is_err() - ); + assert!(into_event_batch( + req, + Bytes::from(serde_json::to_vec(&json).unwrap()), + schema, + None, + ) + .is_err()); } #[test] @@ -682,11 +715,11 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _,_) = into_event_batch( + let (_, rb, _, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(),None, - + HashMap::default(), + None, ) .unwrap(); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 7689754c9..c6aa8e2ff 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,15 +23,14 @@ use actix_web::{web, HttpRequest, Responder}; use chrono::Utc; use serde_json::Value; +use self::error::{CreateStreamError, StreamError}; use crate::alerts::Alerts; +use crate::handlers::TIME_PARTITION_KEY; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::retention::Retention; -use crate::storage::{LogStream, StorageDir}; +use crate::storage::{retention::Retention, LogStream, StorageDir}; use crate::{catalog, event, stats}; use crate::{metadata, validator}; -use crate::handlers::TIME_PARTITION_KEY; -use self::error::{CreateStreamError, StreamError}; pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -117,7 +116,6 @@ pub async fn put_stream(req: HttpRequest) -> Result .find(|&(key, _)| key == TIME_PARTITION_KEY) { time_partition = time_partition_name.to_str().unwrap().to_owned(); - } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -337,7 +335,10 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn create_stream(stream_name: String, time_partition: String) -> Result<(), CreateStreamError> { +pub async fn create_stream( + stream_name: String, + time_partition: String, +) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; @@ -347,15 +348,15 @@ pub async fn create_stream(stream_name: String, time_partition: String) -> Resul return Err(CreateStreamError::Storage { stream_name, err }); } - - let stream_meta: Result = CONFIG - .storage() - .get_object_store() - .get_stream_metadata(&stream_name) - .await; + let stream_meta: Result = + CONFIG + .storage() + .get_object_store() + .get_stream_metadata(&stream_name) + .await; let stream_meta = stream_meta.unwrap(); let created_at = stream_meta.created_at; - + metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at, time_partition); Ok(()) diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 16b24b69b..d7896b944 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -56,7 +56,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result = Lazy::new(|| Query::create_session_context(CONFIG.storage())); @@ -103,11 +103,13 @@ impl Query { SessionContext::new_with_state(state) } - pub async fn execute(&self, stream_name: String) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute( + &self, + stream_name: String, + ) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(stream_name)) .await?; - println!("df: {:?}", df); let fields = df .schema() @@ -124,9 +126,6 @@ impl Query { /// return logical plan with all time filters applied through fn final_logical_plan(&self, stream_name: String) -> LogicalPlan { let filters = self.filter_tag.clone().and_then(tag_filter); - println!("filters: {:?}", filters); - println!("start time: {:?}", self.start.naive_utc()); - println!("end time: {:?}", self.end.naive_utc()); // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -138,9 +137,8 @@ impl Query { self.start.naive_utc(), self.end.naive_utc(), filters, - stream_name + stream_name, ); - println!("transformed : {:?}", transformed); LogicalPlan::Explain(Explain { verbose: plan.verbose, stringified_plans: vec![ @@ -151,7 +149,13 @@ impl Query { logical_optimization_succeeded: plan.logical_optimization_succeeded, }) } - x => transform(x, self.start.naive_utc(), self.end.naive_utc(), filters, stream_name), + x => transform( + x, + self.start.naive_utc(), + self.end.naive_utc(), + filters, + stream_name, + ), } } @@ -202,44 +206,63 @@ fn transform( start_time: NaiveDateTime, end_time: NaiveDateTime, filters: Option, - stream_name: String + stream_name: String, ) -> LogicalPlan { plan.transform(&|plan| match plan { LogicalPlan::TableScan(table) => { let hash_map = STREAM_INFO.read().unwrap(); - // let time_partition = hash_map - // .get(&stream_name) - // .ok_or(PostError::StreamNotFound(stream_name.clone()))? - // .time_partition - // .clone(); - + let time_partition = hash_map + .get(&stream_name) + .ok_or(DataFusionError::Execution(format!( + "stream not found {}", + stream_name.clone() + )))? + .time_partition + .clone(); + let mut new_filters = vec![]; if !table_contains_any_time_filters(&table) { - - let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( - start_time, - )) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), - // event::DEFAULT_TIMESTAMP_KEY, - "timestamp", - ))); - let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned_reference()), - // event::DEFAULT_TIMESTAMP_KEY, - "timestamp", - ))); - new_filters.push(start_time_filter); - new_filters.push(end_time_filter); + let mut _start_time_filter: Expr; + let mut _end_time_filter: Expr; + match time_partition { + Some(time_partition) => { + _start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + time_partition.clone(), + ))); + _end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + time_partition, + ))); + } + None => { + _start_time_filter = + PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr_default_timestamp_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + _end_time_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) + .binary_expr_default_timestamp_key(Expr::Column(Column::new( + Some(table.table_name.to_owned_reference()), + event::DEFAULT_TIMESTAMP_KEY, + ))); + } + } + + new_filters.push(_start_time_filter); + new_filters.push(_end_time_filter); } if let Some(tag_filters) = filters.clone() { new_filters.push(tag_filters) } - println!("new_filters: {:?}", new_filters); let new_filter = new_filters.into_iter().reduce(and); - println!("new_filter: {:?}", new_filter); if let Some(new_filter) = new_filter { let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index f3eead41b..f8f6202c8 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -40,7 +40,7 @@ use datafusion::{ optimizer::utils::conjunction, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::{self, empty::EmptyExec, union::UnionExec, ExecutionPlan, Statistics}, - prelude::{Column, Expr}, + prelude::Expr, scalar::ScalarValue, }; use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; @@ -287,9 +287,7 @@ impl TableProvider for StandardTableProvider { ) -> Result, DataFusionError> { let mut memory_exec = None; let mut cache_exec = None; - println!("filters: {:?}", filters); let time_filters = extract_primary_filter(filters); - println!("time_filters: {:?}", time_filters); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } @@ -316,7 +314,7 @@ impl TableProvider for StandardTableProvider { // Fetch snapshot let object_store_format = glob_storage - .get_snapshot(&self.stream) + .get_object_store_format(&self.stream) .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; @@ -512,34 +510,54 @@ impl PartialTimeFilter { Some(value) } - pub fn binary_expr(&self, left: Expr) -> Expr { + pub fn binary_expr_default_timestamp_key(&self, left: Expr) -> Expr { let (op, right) = match self { PartialTimeFilter::Low(Bound::Excluded(time)) => { - (Operator::Gt, time) + (Operator::Gt, time.timestamp_millis()) } PartialTimeFilter::Low(Bound::Included(time)) => { - (Operator::GtEq, time) + (Operator::GtEq, time.timestamp_millis()) } PartialTimeFilter::High(Bound::Excluded(time)) => { - (Operator::Lt, time) + (Operator::Lt, time.timestamp_millis()) } PartialTimeFilter::High(Bound::Included(time)) => { - (Operator::LtEq, time) + (Operator::LtEq, time.timestamp_millis()) } - PartialTimeFilter::Eq(time) => (Operator::Eq, time), + PartialTimeFilter::Eq(time) => (Operator::Eq, time.timestamp_millis()), _ => unimplemented!(), }; - Expr::BinaryExpr(BinaryExpr::new( Box::new(left), op, - Box::new(Expr::Literal(ScalarValue::Utf8( - Some(format!("{:?}", right)), + Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(right), + None, ))), )) } + pub fn binary_expr_timestamp_partition_key(&self, left: Expr) -> Expr { + let (op, right) = match self { + PartialTimeFilter::Low(Bound::Excluded(time)) => (Operator::Gt, time), + PartialTimeFilter::Low(Bound::Included(time)) => (Operator::GtEq, time), + PartialTimeFilter::High(Bound::Excluded(time)) => (Operator::Lt, time), + PartialTimeFilter::High(Bound::Included(time)) => (Operator::LtEq, time), + PartialTimeFilter::Eq(time) => (Operator::Eq, time), + _ => unimplemented!(), + }; + + Expr::BinaryExpr(BinaryExpr::new( + Box::new(left), + op, + Box::new(Expr::Literal(ScalarValue::Utf8(Some(format!( + "{:?}", + right + ))))), + )) + } + fn is_greater_than(&self, other: &NaiveDateTime) -> bool { match self { PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other, @@ -616,18 +634,13 @@ fn expr_in_boundary(filter: &Expr) -> bool { } fn extract_from_lit(expr: &Expr) -> Option { - println!("expr: {:?}", expr); if let Expr::Literal(value) = expr { - println!("value: {:?}", value); match value { ScalarValue::TimestampMillisecond(Some(value), _) => { Some(NaiveDateTime::from_timestamp_millis(*value).unwrap()) - }, - ScalarValue::Utf8(Some(str_value)) => { - println!("str_value: {:?}", str_value); - Some(str_value.parse::().unwrap()) } - + ScalarValue::Utf8(Some(str_value)) => Some(str_value.parse::().unwrap()), + _ => None, } } else { @@ -636,20 +649,8 @@ fn extract_from_lit(expr: &Expr) -> Option { } fn extract_timestamp_bound(binexpr: &BinaryExpr) -> Option<(Operator, NaiveDateTime)> { - println!("binexpr: {:?}", binexpr); - println!("binexpr.left: {:?}", binexpr.left); - println!("binexpr.right: {:?}", binexpr.right); - let time = extract_from_lit(&binexpr.right)?; - println!("time: {:?}", time); Some((binexpr.op, time)) - // if matches!(&*binexpr.left, Expr::Column(Column { name, .. }) if name == DEFAULT_TIMESTAMP_KEY) - // { - // let time = extract_from_lit(&binexpr.right)?; - // Some((binexpr.op, time)) - // } else { - // None - // } } async fn collect_manifest_files( diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7e58e9072..9f016f5a9 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -106,7 +106,11 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn create_stream(&self, stream_name: &str, time_partition: &str) -> Result<(), ObjectStorageError> { + async fn create_stream( + &self, + stream_name: &str, + time_partition: &str, + ) -> Result<(), ObjectStorageError> { let mut format = ObjectStoreFormat::default(); format.set_id(CONFIG.parseable.username.clone()); let permission = Permisssion::new(CONFIG.parseable.username.clone()); @@ -116,14 +120,13 @@ pub trait ObjectStorage: Sync + 'static { } else { format.time_partition = Some(time_partition.to_string()); } - + let format_json = to_bytes(&format); self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty())) .await?; self.put_object(&stream_json_path(stream_name), format_json) .await?; - Ok(()) } @@ -274,7 +277,6 @@ pub trait ObjectStorage: Sync + 'static { path: &RelativePath, ) -> Result, ObjectStorageError> { let path = manifest_path(path.as_str()); - println!("path: {:?}", path); match self.get_object(&path).await { Ok(bytes) => Ok(Some( serde_json::from_slice(&bytes).expect("manifest is valid json"), @@ -298,13 +300,13 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&manifest)).await } - - - async fn get_snapshot(&self, stream: &str) -> Result { + async fn get_object_store_format( + &self, + stream: &str, + ) -> Result { let path = stream_json_path(stream); let bytes = self.get_object(&path).await?; - Ok(serde_json::from_slice::(&bytes) - .expect("snapshot is valid json")) + Ok(serde_json::from_slice::(&bytes).expect("snapshot is valid json")) } async fn put_snapshot( @@ -362,11 +364,10 @@ pub trait ObjectStorage: Sync + 'static { let absolute_path = self .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) .to_string(); - let manifest_file_path = get_stream_manifest_path(stream_relative_path); let store = CONFIG.storage().get_object_store(); let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); - catalog::update_snapshot(store, stream, manifest,&manifest_file_path).await?; + catalog::update_snapshot(store, stream, manifest).await?; if cache_enabled && cache_manager.is_some() { cache_updates .entry(stream) @@ -412,7 +413,6 @@ pub trait ObjectStorage: Sync + 'static { } } - async fn commit_schema_to_storage( stream_name: &str, schema: Schema, @@ -454,9 +454,3 @@ fn alert_json_path(stream_name: &str) -> RelativePathBuf { fn manifest_path(prefix: &str) -> RelativePathBuf { RelativePathBuf::from_iter([prefix, MANIFEST_FILE]) } - -fn get_stream_manifest_path(stream_relative_path: String) -> String{ - let v: Vec<&str> = stream_relative_path.split("/").collect(); - let manifest_path = format!("{}/{}", v[0], v[1]); - manifest_path -} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 5413ce5fd..c07d0b24b 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -65,15 +65,7 @@ impl StorageDir { + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); let local_uri = str::replace(&uri, "/", "."); let hostname = utils::hostname_unchecked(); - if extention == PARQUET_FILE_EXTENSION { - let mut rng = rand::thread_rng(); - let n: u64 = rng.gen(); - format!("{local_uri}{hostname}{n}.{extention}") - } - else{ - format!("{local_uri}{hostname}.{extention}") - } - + format!("{local_uri}{hostname}.{extention}") } fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { @@ -88,9 +80,15 @@ impl StorageDir { Self::filename_by_time(stream_hash, parsed_timestamp) } - pub fn path_by_current_time(&self, stream_hash: &str, parsed_timestamp: NaiveDateTime) -> PathBuf { - self.data_path - .join(Self::filename_by_current_time(stream_hash, parsed_timestamp)) + pub fn path_by_current_time( + &self, + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + ) -> PathBuf { + self.data_path.join(Self::filename_by_current_time( + stream_hash, + parsed_timestamp, + )) } pub fn arrow_files(&self) -> Vec { @@ -166,11 +164,11 @@ impl StorageDir { fn arrow_path_to_parquet(path: &Path) -> PathBuf { let file_stem = path.file_stem().unwrap().to_str().unwrap(); let mut rng = rand::thread_rng(); - let n: u64 = rng.gen(); + let random_number: u64 = rng.gen(); let (_, filename) = file_stem.split_once('.').unwrap(); - let filename_with_random = format!("{}.{}.{}",filename, n, "arrows"); + let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random); + parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); parquet_path } From 9088a5daeb0b4a8d407d71ad99cf35dce16bcd79 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 5 Mar 2024 00:14:18 +0530 Subject: [PATCH 4/9] modified as per suggestions from deep source analysis --- server/src/handlers/http/ingest.rs | 20 ++++++++++---------- server/src/handlers/http/logstream.rs | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 633c493b1..a080b7ec7 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -183,7 +183,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string(), String::new()).await?; + super::logstream::create_stream(stream_name.to_string(), String::default()).await?; Ok(()) } @@ -282,7 +282,7 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb, _, _) = into_event_batch( + let (size, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), @@ -329,7 +329,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), @@ -367,7 +367,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema, @@ -429,7 +429,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema, @@ -476,7 +476,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), @@ -530,7 +530,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), @@ -584,7 +584,7 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema, @@ -630,7 +630,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), @@ -715,7 +715,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _, _) = into_event_batch( + let (_, rb,..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index c6aa8e2ff..6d6bac25c 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -109,7 +109,7 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest) -> Result { - let mut time_partition: String = String::new(); + let mut time_partition: String = String::default(); if let Some((_, time_partition_name)) = req .headers() .iter() From 9665ac1a9b71148e78ed59fdc8a6820498fab44d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 5 Mar 2024 00:21:23 +0530 Subject: [PATCH 5/9] cargo fmt changes --- server/src/handlers/http/ingest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index a080b7ec7..3de6e6244 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -715,7 +715,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb,..) = into_event_batch( + let (_, rb, ..) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), HashMap::default(), From 08d6a92b1ac4279ad311c2b253cfeec4b3552b77 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 5 Mar 2024 14:57:09 +0530 Subject: [PATCH 6/9] fixed query with multiple filters --- server/src/query/stream_schema_provider.rs | 72 +++++++++++++--------- 1 file changed, 43 insertions(+), 29 deletions(-) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index f8f6202c8..3a99b3623 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -287,12 +287,24 @@ impl TableProvider for StandardTableProvider { ) -> Result, DataFusionError> { let mut memory_exec = None; let mut cache_exec = None; - let time_filters = extract_primary_filter(filters); + let object_store = state + .runtime_env() + .object_store_registry + .get_store(&self.url) + .unwrap(); + let glob_storage = CONFIG.storage().get_object_store(); + + let object_store_format = glob_storage + .get_object_store_format(&self.stream) + .await + .map_err(|err| DataFusionError::Plan(err.to_string()))?; + let time_partition = object_store_format.time_partition; + let time_filters = extract_primary_filter(filters, time_partition.clone()); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters) { + if include_now(filters, time_partition) { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -305,19 +317,7 @@ impl TableProvider for StandardTableProvider { } }; - let object_store = state - .runtime_env() - .object_store_registry - .get_store(&self.url) - .unwrap(); - let glob_storage = CONFIG.storage().get_object_store(); - // Fetch snapshot - let object_store_format = glob_storage - .get_object_store_format(&self.stream) - .await - .map_err(|err| DataFusionError::Plan(err.to_string()))?; - let snapshot = object_store_format.snapshot; // Is query timerange is overlapping with older data. @@ -493,11 +493,11 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr) -> Option { + fn try_from_expr(expr: &Expr, time_partition: Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; }; - let (op, time) = extract_timestamp_bound(binexpr)?; + let (op, time) = extract_timestamp_bound(binexpr.clone(), time_partition)?; let value = match op { Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), @@ -587,14 +587,14 @@ fn is_overlapping_query( .all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc())) } -fn include_now(filters: &[Expr]) -> bool { +fn include_now(filters: &[Expr], time_partition: Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) .expect("zeroed value is valid") .naive_utc(); - let time_filters = extract_primary_filter(filters); + let time_filters = extract_primary_filter(filters, time_partition); let upper_bound_matches = time_filters.iter().any(|filter| match filter { PartialTimeFilter::High(Bound::Excluded(time)) @@ -619,7 +619,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; - let Some((op, time)) = extract_timestamp_bound(binexpr) else { + let Some((op, time)) = extract_timestamp_bound(binexpr.clone(), None) else { return false; }; @@ -633,14 +633,23 @@ fn expr_in_boundary(filter: &Expr) -> bool { ) } -fn extract_from_lit(expr: &Expr) -> Option { - if let Expr::Literal(value) = expr { +fn extract_from_lit(expr: BinaryExpr, time_partition: Option) -> Option { + let mut column_name: String = String::default(); + if let Expr::Column(column) = *expr.left { + column_name = column.name; + } + if let Expr::Literal(value) = *expr.right { match value { ScalarValue::TimestampMillisecond(Some(value), _) => { - Some(NaiveDateTime::from_timestamp_millis(*value).unwrap()) + Some(NaiveDateTime::from_timestamp_millis(value).unwrap()) + } + ScalarValue::Utf8(Some(str_value)) => { + if time_partition.is_some() && column_name == time_partition.unwrap() { + Some(str_value.parse::().unwrap()) + } else { + None + } } - ScalarValue::Utf8(Some(str_value)) => Some(str_value.parse::().unwrap()), - _ => None, } } else { @@ -648,9 +657,11 @@ fn extract_from_lit(expr: &Expr) -> Option { } } -fn extract_timestamp_bound(binexpr: &BinaryExpr) -> Option<(Operator, NaiveDateTime)> { - let time = extract_from_lit(&binexpr.right)?; - Some((binexpr.op, time)) +fn extract_timestamp_bound( + binexpr: BinaryExpr, + time_partition: Option, +) -> Option<(Operator, NaiveDateTime)> { + Some((binexpr.op, extract_from_lit(binexpr, time_partition)?)) } async fn collect_manifest_files( @@ -676,11 +687,14 @@ async fn collect_manifest_files( } // extract start time and end time from filter preficate -fn extract_primary_filter(filters: &[Expr]) -> Vec { +fn extract_primary_filter( + filters: &[Expr], + time_partition: Option, +) -> Vec { let mut time_filters = Vec::new(); filters.iter().for_each(|expr| { let _ = expr.apply(&mut |expr| { - let time = PartialTimeFilter::try_from_expr(expr); + let time = PartialTimeFilter::try_from_expr(expr, time_partition.clone()); if let Some(time) = time { time_filters.push(time); Ok(VisitRecursion::Stop) From 6592caa4369281816ada6588c5c01dcd1e2ce0fd Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 6 Mar 2024 13:31:35 +0530 Subject: [PATCH 7/9] fix for ingesting event batch --- server/src/handlers/http/ingest.rs | 242 +++++++++++------------------ server/src/utils/json.rs | 10 ++ 2 files changed, 101 insertions(+), 151 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 3de6e6244..8905af715 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -19,7 +19,7 @@ use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::Field; use bytes::Bytes; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::{DateTime, Utc}; use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; @@ -34,6 +34,7 @@ use crate::handlers::{ }; use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use crate::utils::json::convert_array_to_object; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; @@ -94,88 +95,101 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb, is_first_event, parsed_timestamp) = { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema - .clone(); - - let time_partition = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .time_partition - .clone(); - - into_event_batch(req, body, schema, time_partition)? - }; - - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event, - parsed_timestamp, - } - .process() - .await?; - - Ok(()) -} + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .schema + .clone(); + + let time_partition = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .time_partition + .clone(); + let body_val: Value = serde_json::from_slice(&body)?; + let size: usize = body.len(); + let mut parsed_timestamp = Utc::now().naive_utc(); + if time_partition.is_none() { + let (rb, is_first_event) = into_event_batch(req, body_val, schema)?; + event::Event { + rb, + stream_name, + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + } + -fn into_event_batch( - req: HttpRequest, - body: Bytes, - schema: HashMap>, - time_partition: Option, -) -> Result<(usize, arrow_array::RecordBatch, bool, NaiveDateTime), PostError> { - let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; - let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; - let size = body.len(); - let body: Value = serde_json::from_slice(&body)?; - let mut ingestion_prefix_timestamp = Utc::now().naive_utc(); - - if time_partition.is_some() { - let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); - if body_timestamp.is_some() { - if body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .is_ok() - { - ingestion_prefix_timestamp = body_timestamp + .process() + .await? + } else { + let data = convert_array_to_object(body_val.clone())?; + for value in data { + let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + if body_timestamp.is_some() { + if body_timestamp .unwrap() .to_owned() .as_str() .unwrap() .parse::>() - .unwrap() - .naive_utc(); + .is_ok() + { + parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + + let (rb, is_first_event) = into_event_batch(req.clone(), value, schema.clone())?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + } + .process() + .await?; + } else { + return Err(PostError::Invalid(anyhow::Error::msg(format!( + "field {} is not in the correct datetime format", + body_timestamp.unwrap().to_owned().as_str().unwrap() + )))); + } } else { return Err(PostError::Invalid(anyhow::Error::msg(format!( - "field {} is not in the correct datetime format", - body_timestamp.unwrap().to_owned().as_str().unwrap() + "ingestion failed as field {} is not part of the log", + time_partition.unwrap() )))); } - } else { - return Err(PostError::Invalid(anyhow::Error::msg(format!( - "ingestion failed as field {} is not part of the log", - time_partition.unwrap() - )))); } } + Ok(()) +} + +fn into_event_batch( + req: HttpRequest, + body: Value, + schema: HashMap>, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; + let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; + let event = format::json::Event { data: body, tags, metadata, }; let (rb, is_first) = event.into_recordbatch(schema)?; - Ok((size, rb, is_first, ingestion_prefix_timestamp)) + + Ok((rb, is_first)) } // Check if the stream exists and create a new stream if doesn't exist @@ -235,7 +249,6 @@ mod tests { types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, }; use arrow_schema::{DataType, Field}; - use bytes::Bytes; use serde_json::json; use crate::{ @@ -282,15 +295,8 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); - assert_eq!(size, 28); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( @@ -329,13 +335,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -367,13 +367,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -405,13 +399,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - ) - .is_err()); + assert!(into_event_batch(req, json, schema,).is_err()); } #[test] @@ -429,13 +417,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -447,13 +429,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .is_err()) + assert!(into_event_batch(req, json, HashMap::default(),).is_err()) } #[test] @@ -476,13 +452,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -530,13 +500,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -584,13 +548,7 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, schema).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -630,13 +588,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 5); @@ -681,13 +633,7 @@ mod tests { .into_iter(), ); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - ) - .is_err()); + assert!(into_event_batch(req, json, schema,).is_err()); } #[test] @@ -715,13 +661,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, ..) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - ) - .unwrap(); + let (rb, ..) = into_event_batch(req, json, HashMap::default()).unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index 0f18d4bf7..b448d6860 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -25,6 +25,16 @@ pub fn flatten_json_body(body: serde_json::Value) -> Result Result, anyhow::Error> { + let data = flatten_json_body(body)?; + let value_arr = match data { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => unreachable!("flatten would have failed beforehand"), + }; + Ok(value_arr) +} + pub fn convert_to_string(value: &Value) -> Value { match value { Value::Null => Value::String("null".to_owned()), From c04cf4f5f1a9871ea857460e526fef664b1453e4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 6 Mar 2024 18:02:08 +0530 Subject: [PATCH 8/9] fix the count of ingested events review comments incorporated --- server/src/catalog.rs | 22 +++++---- server/src/handlers/http/ingest.rs | 64 +++++++++++++++------------ server/src/handlers/http/logstream.rs | 30 +++++++------ 3 files changed, 66 insertions(+), 50 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index c9502d1ef..a4e5cfebd 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -74,7 +74,7 @@ fn get_file_bounds( file: &manifest::File, partition_column: String, ) -> (DateTime, DateTime) { - if partition_column == *DEFAULT_TIMESTAMP_KEY.to_string() { + if partition_column == DEFAULT_TIMESTAMP_KEY { match file .columns() .iter() @@ -122,14 +122,18 @@ pub async fn update_snapshot( let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; let time_partition = meta.time_partition; - let mut _lower_bound = Utc::now(); - if time_partition.is_none() { - (_lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); - } else { - (_lower_bound, _) = get_file_bounds(&change, time_partition.unwrap().to_string()); - } + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(&change, time_partition); + lower_bound + } + None => { + let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; let pos = manifests.iter().position(|item| { - item.time_lower_bound <= _lower_bound && _lower_bound < item.time_upper_bound + item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound }); // We update the manifest referenced by this position @@ -149,7 +153,7 @@ pub async fn update_snapshot( manifest.apply_change(change); storage.put_manifest(&manifest_path, manifest).await?; } else { - let lower_bound = _lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound .date_naive() .and_time( diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 8905af715..1f09e9a07 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,15 +16,6 @@ * */ -use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; -use arrow_schema::Field; -use bytes::Bytes; -use chrono::{DateTime, Utc}; -use http::StatusCode; -use serde_json::Value; -use std::collections::{BTreeMap, HashMap}; -use std::sync::Arc; - use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; @@ -33,8 +24,17 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use crate::utils::json::convert_array_to_object; +use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; +use arrow_schema::Field; +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use http::StatusCode; +use serde_json::Value; +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; use super::logstream::error::CreateStreamError; use super::{kinesis, otel}; @@ -95,35 +95,28 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema - .clone(); - - let time_partition = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .time_partition - .clone(); + let glob_storage = CONFIG.storage().get_object_store(); + let object_store_format = glob_storage + .get_object_store_format(&stream_name) + .await + .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + let time_partition = object_store_format.time_partition; let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); if time_partition.is_none() { - let (rb, is_first_event) = into_event_batch(req, body_val, schema)?; + let stream = stream_name.clone(); + let (rb, is_first_event) = get_stream_schema(stream.clone(), req, body_val)?; event::Event { rb, - stream_name, + stream_name: stream, origin_format: "json", origin_size: size as u64, is_first_event, parsed_timestamp, } - - .process() - .await? + .await?; } else { let data = convert_array_to_object(body_val.clone())?; for value in data { @@ -146,7 +139,8 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .unwrap() .naive_utc(); - let (rb, is_first_event) = into_event_batch(req.clone(), value, schema.clone())?; + let (rb, is_first_event) = + get_stream_schema(stream_name.clone(), req.clone(), value)?; event::Event { rb, stream_name: stream_name.clone(), @@ -174,6 +168,20 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result Ok(()) } +fn get_stream_schema( + stream_name: String, + req: HttpRequest, + body: Value, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name))? + .schema + .clone(); + into_event_batch(req, body, schema) +} + fn into_event_batch( req: HttpRequest, body: Value, @@ -197,7 +205,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr if STREAM_INFO.stream_exists(stream_name) { return Ok(()); } - super::logstream::create_stream(stream_name.to_string(), String::default()).await?; + super::logstream::create_stream(stream_name.to_string(), "").await?; Ok(()) } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 6d6bac25c..20fd6330e 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -109,14 +109,15 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest) -> Result { - let mut time_partition: String = String::default(); - if let Some((_, time_partition_name)) = req + let time_partition = if let Some((_, time_partition_name)) = req .headers() .iter() .find(|&(key, _)| key == TIME_PARTITION_KEY) { - time_partition = time_partition_name.to_str().unwrap().to_owned(); - } + time_partition_name.to_str().unwrap() + } else { + "" + }; let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if metadata::STREAM_INFO.stream_exists(&stream_name) { @@ -337,27 +338,30 @@ fn remove_id_from_alerts(value: &mut Value) { pub async fn create_stream( stream_name: String, - time_partition: String, + time_partition: &str, ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage.create_stream(&stream_name, &time_partition).await { + if let Err(err) = storage.create_stream(&stream_name, time_partition).await { return Err(CreateStreamError::Storage { stream_name, err }); } - let stream_meta: Result = - CONFIG - .storage() - .get_object_store() - .get_stream_metadata(&stream_name) - .await; + let stream_meta = CONFIG + .storage() + .get_object_store() + .get_stream_metadata(&stream_name) + .await; let stream_meta = stream_meta.unwrap(); let created_at = stream_meta.created_at; - metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at, time_partition); + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + ); Ok(()) } From 4a60cfb9a474446bd44e19aa3fac571be0b6b5cc Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 7 Mar 2024 00:01:04 +0530 Subject: [PATCH 9/9] fixed review comments --- server/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index a4e5cfebd..f44159612 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -81,7 +81,7 @@ fn get_file_bounds( .find(|col| col.name == partition_column) .unwrap() .stats - .clone() + .as_ref() .unwrap() { column::TypedStatistics::Int(stats) => ( @@ -101,7 +101,7 @@ fn get_file_bounds( .find(|col| col.name == partition_column) .unwrap() .stats - .clone() + .as_ref() .unwrap() { column::TypedStatistics::String(stats) => (