Skip to content

Commit

Permalink
Merge branch 'main' into update-schema-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable authored Jun 27, 2024
2 parents 5b1ccc6 + c2164eb commit ce8e5a1
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 25 deletions.
7 changes: 5 additions & 2 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ use relative_path::{RelativePath, RelativePathBuf};
use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;

use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::option::validation;
use crate::{
handlers::http::users::USERS_ROOT_DIR,
metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics},
};

use super::{
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY,
Expand Down Expand Up @@ -291,7 +294,7 @@ impl ObjectStorage for LocalFS {
}

async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError> {
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY];
let ignore_dir = &["lost+found", PARSEABLE_ROOT_DIRECTORY, USERS_ROOT_DIR];
let directories = ReadDirStream::new(fs::read_dir(&self.root).await?);
let entries: Vec<DirEntry> = directories.try_collect().await?;
let entries = entries
Expand Down
48 changes: 29 additions & 19 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,19 @@ impl StorageDir {
let random_string =
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15);
for arrow_file_path in arrow_files {
let key = Self::arrow_path_to_parquet(&arrow_file_path, random_string.clone());
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);
if arrow_file_path.metadata().unwrap().len() == 0 {
log::error!(
"Invalid arrow file detected, removing it: {:?}",
arrow_file_path
);
fs::remove_file(&arrow_file_path).unwrap();
} else {
let key = Self::arrow_path_to_parquet(&arrow_file_path, random_string.clone());
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);
}
}
grouped_arrow_file
}
Expand Down Expand Up @@ -255,34 +263,36 @@ pub fn convert_disk_files_to_parquet(
custom_partition_fields.insert(custom_partition_field.to_string(), index);
}
}
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let props = parquet_writer_props(
time_partition.clone(),
index_time_partition,
custom_partition_fields,
)
.build();

schemas.push(merged_schema.clone());
let schema = Arc::new(merged_schema);
let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?;
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
let mut writer = ArrowWriter::try_new(&parquet_file, schema.clone(), Some(props))?;
for ref record in record_reader.merged_iter(schema, time_partition.clone()) {
writer.write(record)?;
}

writer.close()?;

for file in files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();

if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
if parquet_file.metadata().unwrap().len() == 0 {
log::error!("Invalid parquet file detected, removing it");
fs::remove_file(parquet_path).unwrap();
} else {
for file in files {
let file_size = file.metadata().unwrap().len();
let file_type = file.extension().unwrap().to_str().unwrap();
if fs::remove_file(file.clone()).is_err() {
log::error!("Failed to delete file. Unstable state");
process::abort()
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
metrics::STORAGE_SIZE
.with_label_values(&["staging", stream, file_type])
.sub(file_size as i64);
}
}

Expand Down
35 changes: 32 additions & 3 deletions server/src/users/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,45 @@ use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option

pub static FILTERS: Lazy<Filters> = Lazy::new(Filters::default);

#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Filter {
version: String,
stream_name: String,
filter_name: String,
filter_id: String,
query: String,
query: FilterQuery,
time_filter: Option<TimeFilter>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterQuery {
filter_type: String,
filter_query: Option<String>,
filter_builder: Option<FilterBuilder>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterBuilder {
id: String,
combinator: String,
rules: Vec<FilterRules>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct FilterRules {
id: String,
combinator: String,
rules: Vec<Rules>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Rules {
id: String,
field: String,
value: String,
operator: String,
}

impl Filter {
pub fn filter_id(&self) -> &str {
&self.filter_id
Expand Down Expand Up @@ -70,7 +99,7 @@ impl Filters {

pub fn update(&self, filter: Filter) {
let mut s = self.0.write().expect(LOCK_EXPECT);

s.retain(|f| f.filter_id() != filter.filter_id());
s.push(filter);
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/users/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod filters;

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, Default, Clone)]
#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq, Eq)]
pub struct TimeFilter {
to: String,
from: String,
Expand Down

0 comments on commit ce8e5a1

Please sign in to comment.