Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: issues cause by prev commit #800

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Event {
&key,
self.rb.clone(),
self.parsed_timestamp,
self.custom_partition_values,
&self.custom_partition_values,
)?;

metadata::STREAM_INFO.update_stats(
Expand All @@ -96,18 +96,17 @@ impl Event {
Ok(())
}

pub fn process_unchecked(self) -> Result<Self, PostError> {
pub fn process_unchecked(&self) -> Result<(), PostError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(
&self.stream_name,
&key,
self.rb.clone(),
self.parsed_timestamp,
&self.custom_partition_values,
)
.map_err(PostError::Event)?;

Ok(self)
.map_err(PostError::Event)
}

pub fn clear(&self, stream_name: &str) {
Expand All @@ -121,14 +120,14 @@ impl Event {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), EventError> {
STREAM_WRITERS.append_to_local(
stream_name,
schema_key,
rb,
parsed_timestamp,
custom_partition_values,
custom_partition_values.clone(),
)?;
Ok(())
}
Expand Down
16 changes: 13 additions & 3 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Writer {
schema_key: &str,
rb: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
let rb = utils::arrow::replace_columns(
rb.schema(),
Expand Down Expand Up @@ -102,15 +102,22 @@ impl WriterTable {
schema_key,
record,
parsed_timestamp,
custom_partition_values,
&custom_partition_values,
)?;
}
None => {
drop(hashmap_guard);
let map = self.write().unwrap();
// check for race condition
// if map contains entry then just
self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?;
self.handle_missing_writer(
map,
stream_name,
schema_key,
record,
parsed_timestamp,
&custom_partition_values,
)?;
}
};
Ok(())
Expand All @@ -123,13 +130,15 @@ impl WriterTable {
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
if CONFIG.parseable.mode != Mode::Query {
stream_writer.lock().unwrap().push(
stream_name,
schema_key,
record,
parsed_timestamp,
custom_partition_values,
)?;
} else {
stream_writer
Expand All @@ -148,6 +157,7 @@ impl WriterTable {
schema_key: &str,
record: RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
match map.get(stream_name) {
Some(writer) => {
Expand Down
4 changes: 2 additions & 2 deletions server/src/event/writer/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl FileWriter {
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(), StreamWriterError> {
match self.get_mut(schema_key) {
Some(writer) => {
Expand Down Expand Up @@ -89,7 +89,7 @@ fn init_new_stream_writer_file(
schema_key: &str,
record: &RecordBatch,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> Result<(PathBuf, StreamWriter<std::fs::File>), StreamWriterError> {
let dir = StorageDir::new(stream_name);
let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values);
Expand Down
11 changes: 7 additions & 4 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,19 @@ pub async fn push_logs_unchecked(
batches: RecordBatch,
stream_name: &str,
) -> Result<event::Event, PostError> {
event::Event {
let unchecked_event = event::Event {
rb: batches,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: 0,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
is_first_event: true, // NOTE: Maybe should be false
}
.process_unchecked()
is_first_event: true, // NOTE: Maybe should be false
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
};
unchecked_event.process_unchecked()?;

Ok(unchecked_event)
}

async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
Expand Down
23 changes: 11 additions & 12 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::{
};

const ARROW_FILE_EXTENSION: &str = "data.arrows";
const PARQUET_FILE_EXTENSION: &str = "data.parquet";
// const PARQUET_FILE_EXTENSION: &str = "data.parquet";

#[derive(Debug)]
pub struct StorageDir {
Expand All @@ -68,7 +68,7 @@ impl StorageDir {

pub fn file_time_suffix(
time: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
extention: &str,
) -> String {
let mut uri = utils::date_to_prefix(time.date())
Expand All @@ -90,7 +90,7 @@ impl StorageDir {
fn filename_by_time(
stream_hash: &str,
time: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> String {
format!(
"{}.{}",
Expand All @@ -102,7 +102,7 @@ impl StorageDir {
fn filename_by_current_time(
stream_hash: &str,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> String {
Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values)
}
Expand All @@ -111,7 +111,7 @@ impl StorageDir {
&self,
stream_hash: &str,
parsed_timestamp: NaiveDateTime,
custom_partition_values: HashMap<String, String>,
custom_partition_values: &HashMap<String, String>,
) -> PathBuf {
let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string();
let mut filename =
Expand Down Expand Up @@ -201,13 +201,12 @@ impl StorageDir {
}
}

#[allow(unused)]
pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
let dir = StorageDir::file_time_suffix(time, HashMap::new(), PARQUET_FILE_EXTENSION);

data_path.join(dir)
}
// pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
// let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
// let dir = StorageDir::file_time_suffix(time, &HashMap::new(), PARQUET_FILE_EXTENSION);
//
// data_path.join(dir)
// }

pub fn convert_disk_files_to_parquet(
stream: &str,
Expand Down
2 changes: 1 addition & 1 deletion server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn date_to_prefix(date: NaiveDate) -> String {
date.replace("UTC", "")
}

pub fn custom_partition_to_prefix(custom_partition: HashMap<String, String>) -> String {
pub fn custom_partition_to_prefix(custom_partition: &HashMap<String, String>) -> String {
let mut prefix = String::default();
for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) {
prefix.push_str(&format!("{key}={value}/", key = key, value = value));
Expand Down
Loading