Skip to content

Commit

Permalink
fix: issue creating multiple cache files
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed May 24, 2024
1 parent 54db5ef commit 17d07d6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
11 changes: 10 additions & 1 deletion server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,15 @@ pub async fn put_results_in_cache(
}

let user_id = user_id.ok_or(CacheError::Other("User Id not provided"))?;
let mut cache = query_cache_manager.get_cache(stream, user_id).await?;

let cache_key = CacheMetadata::new(query.clone(), start.clone(), end.clone());

// guard to stop multiple caching of the same content
if let Some(path) = cache.get_file(&cache_key) {
log::info!("File already exists in cache, Removing old file");
cache.delete(&cache_key, path).await?;
}

if let Err(err) = query_cache_manager
.create_parquet_cache(stream, records, user_id, start, end, query)
Expand Down Expand Up @@ -262,7 +271,7 @@ pub async fn get_results_from_cache(

let (start, end) = parse_human_time(start_time, end_time)?;

let file_path = query_cache.get_file(CacheMetadata::new(
let file_path = query_cache.get_file(&CacheMetadata::new(
query.to_string(),
start.to_rfc3339(),
end.to_rfc3339(),
Expand Down
24 changes: 13 additions & 11 deletions server/src/querycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,23 @@ impl QueryCache {
}
}

pub fn get_file(&mut self, key: CacheMetadata) -> Option<PathBuf> {
self.files.get(&key).cloned()
pub fn get_file(&mut self, key: &CacheMetadata) -> Option<PathBuf> {
self.files.get(key).cloned()
}

pub fn used_cache_size(&self) -> u64 {
self.current_size
}

pub fn remove(&mut self, key: CacheMetadata) -> Option<PathBuf> {
self.files.remove(&key)
pub fn remove(&mut self, key: &CacheMetadata) -> Option<PathBuf> {
self.files.remove(key)
}

pub async fn delete(&mut self, key: &CacheMetadata, path: PathBuf) -> Result<(), CacheError> {
self.files.delete(key);
AsyncFs::remove_file(path).await?;

Ok(())
}

pub fn queries(&self) -> Vec<&CacheMetadata> {
Expand Down Expand Up @@ -257,7 +264,7 @@ impl QueryCacheManager {
) -> Result<(), CacheError> {
let mut cache = self.get_cache(stream, user_id).await?;

if let Some(remove_result) = cache.remove(key) {
if let Some(remove_result) = cache.remove(&key) {
self.put_cache(stream, &cache, user_id).await?;
tokio::spawn(fs::remove_file(remove_result));
Ok(())
Expand Down Expand Up @@ -339,12 +346,7 @@ impl QueryCacheManager {
return Ok(());
};


let mut arrow_writer = AsyncArrowWriter::try_new(
parquet_file,
sch,
Some(props),
)?;
let mut arrow_writer = AsyncArrowWriter::try_new(parquet_file, sch, Some(props))?;

for record in records {
if let Err(e) = arrow_writer.write(record).await {
Expand Down

0 comments on commit 17d07d6

Please sign in to comment.