From 0ea2511acd92f2ce36caaa0d0973674f2481f8c6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Thu, 16 May 2024 13:34:41 +0530 Subject: [PATCH] fix: issues cause by prev commit --- server/src/event.rs | 13 ++++++------- server/src/event/writer.rs | 16 +++++++++++++--- server/src/event/writer/file_writer.rs | 4 ++-- server/src/handlers/http/ingest.rs | 11 +++++++---- server/src/storage/staging.rs | 23 +++++++++++------------ server/src/utils.rs | 2 +- 6 files changed, 40 insertions(+), 29 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index d546c88fe..6077ccb30 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -74,7 +74,7 @@ impl Event { &key, self.rb.clone(), self.parsed_timestamp, - self.custom_partition_values, + &self.custom_partition_values, )?; metadata::STREAM_INFO.update_stats( @@ -96,7 +96,7 @@ impl Event { Ok(()) } - pub fn process_unchecked(self) -> Result { + pub fn process_unchecked(&self) -> Result<(), PostError> { let key = get_schema_key(&self.rb.schema().fields); Self::process_event( @@ -104,10 +104,9 @@ impl Event { &key, self.rb.clone(), self.parsed_timestamp, + &self.custom_partition_values, ) - .map_err(PostError::Event)?; - - Ok(self) + .map_err(PostError::Event) } pub fn clear(&self, stream_name: &str) { @@ -121,14 +120,14 @@ impl Event { schema_key: &str, rb: RecordBatch, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> Result<(), EventError> { STREAM_WRITERS.append_to_local( stream_name, schema_key, rb, parsed_timestamp, - custom_partition_values, + custom_partition_values.clone(), )?; Ok(()) } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 59c181136..5ccc91e34 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -53,7 +53,7 @@ impl Writer { schema_key: &str, rb: RecordBatch, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { let rb = utils::arrow::replace_columns( rb.schema(), @@ -102,7 +102,7 @@ impl WriterTable { schema_key, record, parsed_timestamp, - custom_partition_values, + &custom_partition_values, )?; } None => { @@ -110,7 +110,14 @@ impl WriterTable { let map = self.write().unwrap(); // check for race condition // if map contains entry then just - self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?; + self.handle_missing_writer( + map, + stream_name, + schema_key, + record, + parsed_timestamp, + &custom_partition_values, + )?; } }; Ok(()) @@ -123,6 +130,7 @@ impl WriterTable { schema_key: &str, record: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { if CONFIG.parseable.mode != Mode::Query { stream_writer.lock().unwrap().push( @@ -130,6 +138,7 @@ impl WriterTable { schema_key, record, parsed_timestamp, + custom_partition_values, )?; } else { stream_writer @@ -148,6 +157,7 @@ impl WriterTable { schema_key: &str, record: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { match map.get(stream_name) { Some(writer) => { diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 78ba01d22..488f8da8a 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -45,7 +45,7 @@ impl FileWriter { schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { @@ -89,7 +89,7 @@ fn init_new_stream_writer_file( schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values); diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 878f01269..f03b45652 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -102,16 +102,19 @@ pub async fn push_logs_unchecked( batches: RecordBatch, stream_name: &str, ) -> Result { - event::Event { + let unchecked_event = event::Event { rb: batches, stream_name: stream_name.to_string(), origin_format: "json", origin_size: 0, parsed_timestamp: Utc::now().naive_utc(), time_partition: None, - is_first_event: true, // NOTE: Maybe should be false - } - .process_unchecked() + is_first_event: true, // NOTE: Maybe should be false + custom_partition_values: HashMap::new(), // should be an empty map for unchecked push + }; + unchecked_event.process_unchecked()?; + + Ok(unchecked_event) } async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index fbe15a99f..b30c8689c 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -52,7 +52,7 @@ use std::{ }; const ARROW_FILE_EXTENSION: &str = "data.arrows"; -const PARQUET_FILE_EXTENSION: &str = "data.parquet"; +// const PARQUET_FILE_EXTENSION: &str = "data.parquet"; #[derive(Debug)] pub struct StorageDir { @@ -68,7 +68,7 @@ impl StorageDir { pub fn file_time_suffix( time: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, extention: &str, ) -> String { let mut uri = utils::date_to_prefix(time.date()) @@ -90,7 +90,7 @@ impl StorageDir { fn filename_by_time( stream_hash: &str, time: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> String { format!( "{}.{}", @@ -102,7 +102,7 @@ impl StorageDir { fn filename_by_current_time( stream_hash: &str, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> String { Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values) } @@ -111,7 +111,7 @@ impl StorageDir { &self, stream_hash: &str, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, ) -> PathBuf { let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string(); let mut filename = @@ -201,13 +201,12 @@ impl StorageDir { } } -#[allow(unused)] -pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { - let data_path = CONFIG.parseable.local_stream_data_path(stream_name); - let dir = StorageDir::file_time_suffix(time, HashMap::new(), PARQUET_FILE_EXTENSION); - - data_path.join(dir) -} +// pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { +// let data_path = CONFIG.parseable.local_stream_data_path(stream_name); +// let dir = StorageDir::file_time_suffix(time, &HashMap::new(), PARQUET_FILE_EXTENSION); +// +// data_path.join(dir) +// } pub fn convert_disk_files_to_parquet( stream: &str, diff --git a/server/src/utils.rs b/server/src/utils.rs index df2efdaa9..82f4f226e 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -63,7 +63,7 @@ pub fn date_to_prefix(date: NaiveDate) -> String { date.replace("UTC", "") } -pub fn custom_partition_to_prefix(custom_partition: HashMap) -> String { +pub fn custom_partition_to_prefix(custom_partition: &HashMap) -> String { let mut prefix = String::default(); for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) { prefix.push_str(&format!("{key}={value}/", key = key, value = value));