Skip to content

Commit

Permalink
fix: removed use of environment var P_STORAGE_UPLOAD_INTERVAL (#653)
Browse files Browse the repository at this point in the history
added const of 60 secs to be used for local to storage sync

fixes #651
  • Loading branch information
nikhilsinhaparseable authored Feb 7, 2024
1 parent d795e8e commit fb3fd21
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 41 deletions.
3 changes: 2 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod validator;
use option::CONFIG;

use crate::localcache::LocalCacheManager;
pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -129,7 +130,7 @@ fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sende
rt.block_on(async {
let mut scheduler = AsyncScheduler::new();
scheduler
.every((CONFIG.parseable.upload_interval as u32).seconds())
.every(STORAGE_UPLOAD_INTERVAL.seconds())
// Extra time interval is added so that this schedular does not race with local sync.
.plus(5u32.seconds())
.run(|| async {
Expand Down
30 changes: 0 additions & 30 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ pub struct Server {
/// Size for local cache
pub local_cache_size: u64,

/// Interval in seconds after which uncommited data would be
/// uploaded to the storage platform.
pub upload_interval: u64,

/// Username for the basic authentication on the server
pub username: String,

Expand Down Expand Up @@ -284,10 +280,6 @@ impl FromArgMatches for Server {
.get_one::<u64>(Self::CACHE_SIZE)
.cloned()
.expect("default value for cache size");
self.upload_interval = m
.get_one::<u64>(Self::UPLOAD_INTERVAL)
.cloned()
.expect("default value for upload");
self.username = m
.get_one::<String>(Self::USERNAME)
.cloned()
Expand Down Expand Up @@ -381,7 +373,6 @@ impl Server {
pub const STAGING: &'static str = "local-staging-path";
pub const CACHE: &'static str = "cache-path";
pub const CACHE_SIZE: &'static str = "cache-size";
pub const UPLOAD_INTERVAL: &'static str = "upload-interval";
pub const USERNAME: &'static str = "username";
pub const PASSWORD: &'static str = "password";
pub const CHECK_UPDATE: &'static str = "check-update";
Expand Down Expand Up @@ -467,16 +458,6 @@ impl Server {
.help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)")
.next_line_help(true),
)
.arg(
Arg::new(Self::UPLOAD_INTERVAL)
.long(Self::UPLOAD_INTERVAL)
.env("P_STORAGE_UPLOAD_INTERVAL")
.value_name("SECONDS")
.default_value("60")
.value_parser(validation::upload_interval)
.help("Interval in seconds after which staging data would be sent to the storage")
.next_line_help(true),
)
.arg(
Arg::new(Self::USERNAME)
.long(Self::USERNAME)
Expand Down Expand Up @@ -677,7 +658,6 @@ pub mod validation {
use path_clean::PathClean;

use crate::option::MIN_CACHE_SIZE_BYTES;
use crate::storage::LOCAL_SYNC_INTERVAL;
use human_size::{multiples, SpecificSize};

pub fn file_path(s: &str) -> Result<PathBuf, String> {
Expand Down Expand Up @@ -755,14 +735,4 @@ pub mod validation {
}
Ok(size)
}

pub fn upload_interval(s: &str) -> Result<u64, String> {
let u = s
.parse::<u64>()
.map_err(|_| "invalid upload interval".to_string())?;
if u < LOCAL_SYNC_INTERVAL {
return Err("object storage upload interval must be 60 seconds or more".to_string());
}
Ok(u)
}
}
16 changes: 6 additions & 10 deletions server/src/storage/staging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,12 @@ impl StorageDir {
.ends_with(&hot_filename)
});

//check if arrow files is not empty, fetch the parquet file path from last file from sorted arrow file list
if !(arrow_files.is_empty()) {
arrow_files.sort();
let key = Self::arrow_path_to_parquet(arrow_files.last().unwrap());
for arrow_file_path in arrow_files {
grouped_arrow_file
.entry(key.clone())
.or_default()
.push(arrow_file_path);
}
for arrow_file_path in arrow_files {
let key = Self::arrow_path_to_parquet(&arrow_file_path);
grouped_arrow_file
.entry(key)
.or_default()
.push(arrow_file_path);
}

grouped_arrow_file
Expand Down

0 comments on commit fb3fd21

Please sign in to comment.