diff --git a/Cargo.lock b/Cargo.lock index f740010071dc..f81dbc733ed6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8463,15 +8463,21 @@ dependencies = [ "async-compression 0.4.11", "async-trait", "async-walkdir", + "base64 0.21.7", "bitflags 2.5.0", "common-error", "common-macro", + "common-runtime", + "common-telemetry", + "common-test-util", "derive_builder 0.12.0", "futures", "lz4_flex 0.11.3", + "moka", "pin-project", "serde", "serde_json", + "sha2", "snafu 0.8.3", "tokio", "tokio-util", diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index 5e1a83f6ab7a..c26a60df351d 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -11,16 +11,24 @@ workspace = true async-compression = "0.4.11" async-trait.workspace = true async-walkdir = "2.0.0" +base64.workspace = true bitflags.workspace = true common-error.workspace = true common-macro.workspace = true +common-runtime.workspace = true +common-telemetry.workspace = true derive_builder.workspace = true futures.workspace = true lz4_flex = "0.11" +moka.workspace = true pin-project.workspace = true serde.workspace = true serde_json.workspace = true +sha2 = "0.10.8" snafu.workspace = true tokio.workspace = true tokio-util.workspace = true uuid.workspace = true + +[dev-dependencies] +common-test-util.workspace = true diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 8dfb5f4575dc..8a28dffdcb54 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::io::Error as IoError; +use std::sync::Arc; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; @@ -80,6 +81,30 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to create"))] + Create { + #[snafu(source)] + error: IoError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to rename"))] + Rename { + #[snafu(source)] + error: IoError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to remove"))] + Remove { + #[snafu(source)] + error: IoError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Error while walking directory"))] WalkDirError { #[snafu(source)] @@ -220,6 +245,9 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Get value from cache"))] + CacheGet { source: Arc }, } impl ErrorExt for Error { @@ -235,6 +263,9 @@ impl ErrorExt for Error { | Close { .. } | Open { .. } | Metadata { .. } + | Create { .. } + | Remove { .. } + | Rename { .. } | SerializeJson { .. } | BytesToInteger { .. } | ParseStageNotMatch { .. } @@ -254,6 +285,8 @@ impl ErrorExt for Error { } DuplicateBlob { .. } => StatusCode::InvalidArguments, + + CacheGet { source } => source.status_code(), } } diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 96d1dfd51928..218f3d3c4749 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -19,6 +19,7 @@ pub mod file_accessor; use std::path::PathBuf; use async_trait::async_trait; +use futures::future::BoxFuture; use futures::{AsyncRead, AsyncSeek}; use crate::blob_metadata::CompressionCodec; @@ -69,12 +70,31 @@ pub struct PutOptions { /// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file. #[async_trait] pub trait PuffinReader { - type Reader: AsyncRead + AsyncSeek; + type Blob: BlobGuard; + type Dir: DirGuard; /// Reads a blob from the Puffin file. - async fn blob(&self, key: &str) -> Result; + /// + /// The returned `BlobGuard` is used to access the blob data. + /// Users should hold the `BlobGuard` until they are done with the blob data. + async fn blob(&self, key: &str) -> Result; /// Reads a directory from the Puffin file. - /// The returned `PathBuf` is used to access the directory in the filesystem. - async fn dir(&self, key: &str) -> Result; + /// + /// The returned `DirGuard` is used to access the directory in the filesystem. + /// The caller is responsible for holding the `DirGuard` until they are done with the directory. + async fn dir(&self, key: &str) -> Result; +} + +/// `BlobGuard` is provided by the `PuffinReader` to access the blob data. +/// Users should hold the `BlobGuard` until they are done with the blob data. +pub trait BlobGuard { + type Reader: AsyncRead + AsyncSeek; + fn reader(&self) -> BoxFuture<'static, Result>; +} + +/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem. +/// Users should hold the `DirGuard` until they are done with the directory. +pub trait DirGuard { + fn path(&self) -> &PathBuf; } diff --git a/src/puffin/src/puffin_manager/cache_manager.rs b/src/puffin/src/puffin_manager/cache_manager.rs index e71ae5141d71..1f471236c0a7 100644 --- a/src/puffin/src/puffin_manager/cache_manager.rs +++ b/src/puffin/src/puffin_manager/cache_manager.rs @@ -12,14 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod moka_cache_manager; + use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; use futures::future::BoxFuture; -use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use futures::AsyncWrite; use crate::error::Result; +use crate::puffin_manager::{BlobGuard, DirGuard}; pub type BoxWriter = Box; @@ -39,34 +42,41 @@ pub type DirWriterProviderRef = Box; /// /// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob` /// can use to write the blob into the cache. -pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult; +pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult; /// Function that initializes a directory. /// /// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir` /// can use to write files inside the directory into the cache. -pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult; +pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult; /// `CacheManager` manages the cache for the puffin files. #[async_trait] pub trait CacheManager { - type Reader: AsyncRead + AsyncSeek; + type Blob: BlobGuard; + type Dir: DirGuard; /// Retrieves a blob, initializing it if necessary using the provided `init_fn`. + /// + /// The returned `BlobGuard` is used to access the blob reader. + /// The caller is responsible for holding the `BlobGuard` until they are done with the blob. async fn get_blob<'a>( &self, puffin_file_name: &str, key: &str, - init_factory: Box, - ) -> Result; + init_factory: Box, + ) -> Result; /// Retrieves a directory, initializing it if necessary using the provided `init_fn`. + /// + /// The returned `DirGuard` is used to access the directory in the filesystem. + /// The caller is responsible for holding the `DirGuard` until they are done with the directory. async fn get_dir<'a>( &self, puffin_file_name: &str, key: &str, - init_fn: Box, - ) -> Result; + init_fn: Box, + ) -> Result; /// Stores a directory in the cache. async fn put_dir( @@ -78,4 +88,4 @@ pub trait CacheManager { ) -> Result<()>; } -pub type CacheManagerRef = Arc + Send + Sync>; +pub type CacheManagerRef = Arc + Send + Sync>; diff --git a/src/puffin/src/puffin_manager/cache_manager/moka_cache_manager.rs b/src/puffin/src/puffin_manager/cache_manager/moka_cache_manager.rs new file mode 100644 index 000000000000..b2d4ed3bafd6 --- /dev/null +++ b/src/puffin/src/puffin_manager/cache_manager/moka_cache_manager.rs @@ -0,0 +1,939 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use async_walkdir::{Filtering, WalkDir}; +use base64::prelude::BASE64_URL_SAFE; +use base64::Engine; +use common_telemetry::{info, warn}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use moka::future::Cache; +use sha2::{Digest, Sha256}; +use snafu::ResultExt; +use tokio::fs; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::error::{ + CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu, + Result, WalkDirSnafu, +}; +use crate::puffin_manager::cache_manager::{ + BoxWriter, CacheManager, DirWriterProvider, InitBlobFn, InitDirFn, +}; +use crate::puffin_manager::{BlobGuard, DirGuard}; + +const DELETE_QUEUE_SIZE: usize = 10240; + +const TMP_EXTENSION: &str = "tmp"; +const DELETED_EXTENSION: &str = "deleted"; + +/// `MokaCacheManager` is a `CacheManager` that uses `moka` to manage cache. +pub struct MokaCacheManager { + /// The base directory of the cache. + base_dir: PathBuf, + + /// The cache maintaining the cache key to the size of the file or directory. + cache: Cache, + + /// The recycle bin for the deleted files and directories. + recycle_bin: Cache, + + /// The delete queue for the cleanup task. + /// + /// The lifetime of a guard is: + /// 1. initially inserted into the cache + /// 2. moved to the recycle bin when evicted + /// 2.1 moved back to the cache when accessed + /// 2.2 deleted from the recycle bin after a certain period + /// 3. sent the delete task to the delete queue on drop + /// 4. background routine removes the file or directory + delete_queue: Sender, +} + +impl MokaCacheManager { + #[allow(unused)] + pub async fn new(base_dir: PathBuf, max_size: u64) -> Result { + let recycle_bin = Cache::builder() + .time_to_live(Duration::from_secs(60)) + .build(); + + let recycle_bin_cloned = recycle_bin.clone(); + let cache = Cache::builder() + .max_capacity(max_size) + .weigher(|_: &String, v: &CacheValue| v.weight()) + .async_eviction_listener(move |k, v, _| { + let recycle_bin = recycle_bin_cloned.clone(); + async move { + recycle_bin.insert(k.as_str().to_string(), v).await; + } + .boxed() + }) + .build(); + + let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE); + common_runtime::bg_runtime().spawn(Self::delete_routine(rx)); + + let manager = Self { + cache, + base_dir, + delete_queue, + recycle_bin, + }; + + manager.recover().await?; + + Ok(manager) + } +} + +#[async_trait] +impl CacheManager for MokaCacheManager { + type Blob = Arc; + type Dir = Arc; + + async fn get_blob<'a>( + &self, + puffin_file_name: &str, + key: &str, + init_fn: Box, + ) -> Result { + let cache_key = Self::encode_cache_key(puffin_file_name, key); + + let v = self + .cache + .try_get_with(cache_key.clone(), async { + if let Some(v) = self.recycle_bin.remove(&cache_key).await { + return Ok(v); + } + + let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); + let path = self.base_dir.join(&file_name); + + let size = Self::write_blob(&path, &init_fn).await?; + + let guard = Arc::new(FsBlobGuard { + path, + delete_queue: self.delete_queue.clone(), + }); + Ok(CacheValue::File { guard, size }) + }) + .await + .context(CacheGetSnafu)?; + + match v { + CacheValue::File { guard, .. } => Ok(guard), + _ => unreachable!(), + } + } + + async fn get_dir<'a>( + &self, + puffin_file_name: &str, + key: &str, + init_fn: Box, + ) -> Result { + let cache_key = Self::encode_cache_key(puffin_file_name, key); + + let v = self + .cache + .try_get_with(cache_key.clone(), async { + if let Some(v) = self.recycle_bin.remove(&cache_key).await { + return Ok(v); + } + + let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); + let path = self.base_dir.join(&dir_name); + + let size = Self::write_dir(&path, &init_fn).await?; + + let guard = Arc::new(FsDirGuard { + path, + delete_queue: self.delete_queue.clone(), + }); + Ok(CacheValue::Dir { guard, size }) + }) + .await + .context(CacheGetSnafu)?; + + match v { + CacheValue::Dir { guard, .. } => Ok(guard), + _ => unreachable!(), + } + } + + async fn put_dir( + &self, + puffin_file_name: &str, + key: &str, + dir_path: PathBuf, + size: u64, + ) -> Result<()> { + let cache_key = Self::encode_cache_key(puffin_file_name, key); + + self.cache + .try_get_with(cache_key.clone(), async move { + if let Some(v) = self.recycle_bin.remove(&cache_key).await { + return Ok(v); + } + + let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); + let path = self.base_dir.join(&dir_name); + + fs::rename(&dir_path, &path).await.context(RenameSnafu)?; + + let guard = Arc::new(FsDirGuard { + path, + delete_queue: self.delete_queue.clone(), + }); + Ok(CacheValue::Dir { guard, size }) + }) + .await + .map(|_| ()) + .context(CacheGetSnafu) + } +} + +impl MokaCacheManager { + fn encode_cache_key(puffin_file_name: &str, key: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(puffin_file_name); + hasher.update(key); + hasher.update(puffin_file_name); + let hash = hasher.finalize(); + + BASE64_URL_SAFE.encode(hash) + } + + async fn write_blob( + target_path: &PathBuf, + init_fn: &(dyn InitBlobFn + Send + Sync + '_), + ) -> Result { + // To guarantee the atomicity of writing the file, we need to write + // the file to a temporary file first... + let tmp_path = target_path.with_extension(TMP_EXTENSION); + let writer = Box::new( + fs::File::create(&tmp_path) + .await + .context(CreateSnafu)? + .compat_write(), + ); + let size = init_fn(writer).await?; + + // ...then rename the temporary file to the target path + fs::rename(tmp_path, target_path) + .await + .context(RenameSnafu)?; + Ok(size) + } + + async fn write_dir( + target_path: &PathBuf, + init_fn: &(dyn InitDirFn + Send + Sync + '_), + ) -> Result { + // To guarantee the atomicity of writing the directory, we need to write + // the directory to a temporary directory first... + let tmp_base = target_path.with_extension(TMP_EXTENSION); + let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone())); + let size = init_fn(writer_provider).await?; + + // ...then rename the temporary directory to the target path + fs::rename(&tmp_base, target_path) + .await + .context(RenameSnafu)?; + Ok(size) + } + + /// Recovers the cache by iterating through the cache directory. + async fn recover(&self) -> Result<()> { + let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?; + + let mut elems = HashMap::new(); + while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? { + let path = entry.path(); + + if path.extension() == Some(TMP_EXTENSION.as_ref()) + || path.extension() == Some(DELETED_EXTENSION.as_ref()) + { + // Remove temporary or deleted files and directories + if entry.metadata().await.context(MetadataSnafu)?.is_dir() { + fs::remove_dir_all(path).await.context(RemoveSnafu)?; + } else { + fs::remove_file(path).await.context(RemoveSnafu)?; + } + } else { + // Insert the size of the file or directory to the cache + let meta = entry.metadata().await.context(MetadataSnafu)?; + let file_path = path.file_name().unwrap().to_string_lossy().into_owned(); + + // . + let key = match file_path.split('.').next() { + Some(key) => key.to_string(), + None => { + warn!( + "Invalid cache file name: {}, expected format: .", + file_path + ); + continue; + } + }; + + if meta.is_dir() { + let size = Self::get_dir_size(&path).await?; + let v = CacheValue::Dir { + guard: Arc::new(FsDirGuard { + path, + delete_queue: self.delete_queue.clone(), + }), + size, + }; + // A duplicate dir will be moved to the delete queue. + let _dup_dir = elems.insert(key, v); + } else { + let v = CacheValue::File { + guard: Arc::new(FsBlobGuard { + path, + delete_queue: self.delete_queue.clone(), + }), + size: meta.len(), + }; + // A duplicate file will be moved to the delete queue. + let _dup_file = elems.insert(key, v); + } + } + } + + for (key, value) in elems { + self.cache.insert(key, value).await; + } + + Ok(()) + } + + /// Walks through the directory and calculate the total size of all files in the directory. + async fn get_dir_size(path: &PathBuf) -> Result { + let mut size = 0; + let mut wd = WalkDir::new(path).filter(|entry| async move { + match entry.file_type().await { + Ok(ft) if ft.is_dir() => Filtering::Ignore, + _ => Filtering::Continue, + } + }); + + while let Some(entry) = wd.next().await { + let entry = entry.context(WalkDirSnafu)?; + size += entry.metadata().await.context(MetadataSnafu)?.len(); + } + + Ok(size) + } + + async fn delete_routine(mut receiver: Receiver) { + while let Some(task) = receiver.recv().await { + match task { + DeleteTask::File(path) => { + if let Err(err) = fs::remove_file(&path).await { + if err.kind() == std::io::ErrorKind::NotFound { + continue; + } + + warn!(err; "Failed to remove the file."); + } + } + DeleteTask::Dir(path) => { + let deleted_path = path.with_extension(DELETED_EXTENSION); + if let Err(err) = fs::rename(&path, &deleted_path).await { + if err.kind() == std::io::ErrorKind::NotFound { + continue; + } + + // Remove the deleted directory if the rename fails and retry + let _ = fs::remove_dir_all(&deleted_path).await; + if let Err(err) = fs::rename(&path, &deleted_path).await { + warn!(err; "Failed to rename the dangling directory to deleted path."); + continue; + } + } + if let Err(err) = fs::remove_dir_all(&deleted_path).await { + warn!(err; "Failed to remove the dangling directory."); + } + } + DeleteTask::Terminate => { + break; + } + } + } + + info!("The delete routine for moka cache manager is terminated."); + } +} + +impl Drop for MokaCacheManager { + fn drop(&mut self) { + let _ = self.delete_queue.try_send(DeleteTask::Terminate); + } +} + +#[derive(Debug, Clone)] +enum CacheValue { + File { guard: Arc, size: u64 }, + Dir { guard: Arc, size: u64 }, +} + +impl CacheValue { + fn size(&self) -> u64 { + match self { + CacheValue::File { size, .. } => *size, + CacheValue::Dir { size, .. } => *size, + } + } + + fn weight(&self) -> u32 { + self.size().try_into().unwrap_or(u32::MAX) + } +} + +enum DeleteTask { + File(PathBuf), + Dir(PathBuf), + Terminate, +} + +/// `FsBlobGuard` is a `BlobGuard` for accessing the blob and +/// automatically deleting the file on drop. +#[derive(Debug)] +pub struct FsBlobGuard { + path: PathBuf, + delete_queue: Sender, +} + +impl BlobGuard for Arc { + type Reader = Compat; + + fn reader(&self) -> BoxFuture<'static, Result> { + let path = self.path.clone(); + async move { + let file = fs::File::open(&path).await.context(OpenSnafu)?; + Ok(file.compat()) + } + .boxed() + } +} + +impl Drop for FsBlobGuard { + fn drop(&mut self) { + if let Err(err) = self + .delete_queue + .try_send(DeleteTask::File(self.path.clone())) + { + if matches!(err, TrySendError::Closed(_)) { + return; + } + warn!(err; "Failed to send the delete task for the file."); + } + } +} + +/// `FsDirGuard` is a `DirGuard` for accessing the directory and +/// automatically deleting the directory on drop. +#[derive(Debug)] +pub struct FsDirGuard { + path: PathBuf, + delete_queue: Sender, +} + +impl DirGuard for Arc { + fn path(&self) -> &PathBuf { + &self.path + } +} + +impl Drop for FsDirGuard { + fn drop(&mut self) { + if let Err(err) = self + .delete_queue + .try_send(DeleteTask::Dir(self.path.clone())) + { + if matches!(err, TrySendError::Closed(_)) { + return; + } + warn!(err; "Failed to send the delete task for the directory."); + } + } +} + +/// `MokaDirWriterProvider` implements `DirWriterProvider` for initializing a directory. +struct MokaDirWriterProvider(PathBuf); + +#[async_trait] +impl DirWriterProvider for MokaDirWriterProvider { + async fn writer(&self, rel_path: &str) -> Result { + let full_path = self.0.join(rel_path); + if let Some(parent) = full_path.parent() { + fs::create_dir_all(parent).await.context(CreateSnafu)?; + } + Ok(Box::new( + fs::File::create(full_path) + .await + .context(CreateSnafu)? + .compat_write(), + ) as BoxWriter) + } +} + +#[cfg(test)] +impl MokaCacheManager { + pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File { + let cache_key = Self::encode_cache_key(puffin_file_name, key); + let value = self.cache.get(&cache_key).await.unwrap(); + let path = match &value { + CacheValue::File { guard, .. } => &guard.path, + _ => panic!("Expected a file, but got a directory."), + }; + fs::File::open(path).await.unwrap() + } + + pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf { + let cache_key = Self::encode_cache_key(puffin_file_name, key); + let value = self.cache.get(&cache_key).await.unwrap(); + let path = match &value { + CacheValue::Dir { guard, .. } => &guard.path, + _ => panic!("Expected a directory, but got a file."), + }; + path.clone() + } + + pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool { + let cache_key = Self::encode_cache_key(puffin_file_name, key); + self.cache.contains_key(&cache_key) + } +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use futures::{AsyncReadExt, AsyncWriteExt}; + use tokio::io::AsyncReadExt as _; + + use super::*; + use crate::error::BlobNotFoundSnafu; + use crate::puffin_manager::cache_manager::CacheManager; + + #[tokio::test] + async fn test_get_blob() { + let tempdir = create_temp_dir("test_get_blob_"); + let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + .await + .unwrap(); + + let puffin_file_name = "test_get_blob"; + let key = "key"; + let mut reader = manager + .get_blob( + puffin_file_name, + key, + Box::new(|mut writer| { + Box::pin(async move { + writer.write_all(b"hello world").await.unwrap(); + Ok(11) + }) + }), + ) + .await + .unwrap() + .reader() + .await + .unwrap(); + + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"hello world"); + + let mut file = manager.must_get_file(puffin_file_name, key).await; + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[tokio::test] + async fn test_get_dir() { + let tempdir = create_temp_dir("test_get_dir_"); + let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + .await + .unwrap(); + + let files_in_dir = [ + ("file_a", "Hello, world!".as_bytes()), + ("file_b", "Hello, Rust!".as_bytes()), + ("file_c", "你好,世界!".as_bytes()), + ("subdir/file_d", "Hello, Puffin!".as_bytes()), + ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), + ]; + + let puffin_file_name = "test_get_dir"; + let key = "key"; + let dir_path = manager + .get_dir( + puffin_file_name, + key, + Box::new(|writer_provider| { + Box::pin(async move { + for (rel_path, content) in &files_in_dir { + let mut writer = writer_provider.writer(rel_path).await.unwrap(); + writer.write_all(content).await.unwrap(); + } + Ok(0) + }) + }), + ) + .await + .unwrap(); + + for (rel_path, content) in &files_in_dir { + let file_path = dir_path.path().join(rel_path); + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, *content); + } + + let dir_path = manager.must_get_dir(puffin_file_name, key).await; + for (rel_path, content) in &files_in_dir { + let file_path = dir_path.join(rel_path); + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, *content); + } + } + + #[tokio::test] + async fn test_recover() { + let tempdir = create_temp_dir("test_recover_"); + let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + .await + .unwrap(); + + // initialize cache + let puffin_file_name = "test_recover"; + let blob_key = "blob_key"; + let guard = manager + .get_blob( + puffin_file_name, + blob_key, + Box::new(|mut writer| { + Box::pin(async move { + writer.write_all(b"hello world").await.unwrap(); + Ok(11) + }) + }), + ) + .await + .unwrap(); + drop(guard); + + let files_in_dir = [ + ("file_a", "Hello, world!".as_bytes()), + ("file_b", "Hello, Rust!".as_bytes()), + ("file_c", "你好,世界!".as_bytes()), + ("subdir/file_d", "Hello, Puffin!".as_bytes()), + ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), + ]; + + let dir_key = "dir_key"; + let guard = manager + .get_dir( + puffin_file_name, + dir_key, + Box::new(|writer_provider| { + Box::pin(async move { + for (rel_path, content) in &files_in_dir { + let mut writer = writer_provider.writer(rel_path).await.unwrap(); + writer.write_all(content).await.unwrap(); + } + Ok(0) + }) + }), + ) + .await + .unwrap(); + drop(guard); + + // recover cache + drop(manager); + let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + .await + .unwrap(); + + let mut reader = manager + .get_blob( + puffin_file_name, + blob_key, + Box::new(|_| Box::pin(async { Ok(0) })), + ) + .await + .unwrap() + .reader() + .await + .unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"hello world"); + + let dir_path = manager + .get_dir( + puffin_file_name, + dir_key, + Box::new(|_| Box::pin(async { Ok(0) })), + ) + .await + .unwrap(); + for (rel_path, content) in &files_in_dir { + let file_path = dir_path.path().join(rel_path); + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, *content); + } + } + + #[tokio::test] + async fn test_eviction() { + let tempdir = create_temp_dir("test_eviction_"); + let manager = MokaCacheManager::new( + tempdir.path().to_path_buf(), + 1, /* extremely small size */ + ) + .await + .unwrap(); + + let puffin_file_name = "test_eviction"; + let blob_key = "blob_key"; + + // First time to get the blob + let mut reader = manager + .get_blob( + puffin_file_name, + blob_key, + Box::new(|mut writer| { + Box::pin(async move { + writer.write_all(b"Hello world").await.unwrap(); + Ok(11) + }) + }), + ) + .await + .unwrap() + .reader() + .await + .unwrap(); + + // The blob should be evicted + manager.cache.run_pending_tasks().await; + assert!(!manager.in_cache(puffin_file_name, blob_key)); + + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"Hello world"); + + // Second time to get the blob, get from recycle bin + let mut reader = manager + .get_blob( + puffin_file_name, + blob_key, + Box::new(|_| async { Ok(0) }.boxed()), + ) + .await + .unwrap() + .reader() + .await + .unwrap(); + + // The blob should be evicted + manager.cache.run_pending_tasks().await; + assert!(!manager.in_cache(puffin_file_name, blob_key)); + + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"Hello world"); + + let dir_key = "dir_key"; + let files_in_dir = [ + ("file_a", "Hello, world!".as_bytes()), + ("file_b", "Hello, Rust!".as_bytes()), + ("file_c", "你好,世界!".as_bytes()), + ("subdir/file_d", "Hello, Puffin!".as_bytes()), + ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), + ]; + + // First time to get the directory + let guard_0 = manager + .get_dir( + puffin_file_name, + dir_key, + Box::new(|writer_provider| { + Box::pin(async move { + let mut size = 0; + for (rel_path, content) in &files_in_dir { + let mut writer = writer_provider.writer(rel_path).await.unwrap(); + writer.write_all(content).await.unwrap(); + size += content.len() as u64; + } + Ok(size) + }) + }), + ) + .await + .unwrap(); + + for (rel_path, content) in &files_in_dir { + let file_path = guard_0.path().join(rel_path); + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, *content); + } + + // The directory should be evicted + manager.cache.run_pending_tasks().await; + assert!(!manager.in_cache(puffin_file_name, dir_key)); + + // Second time to get the directory + let guard_1 = manager + .get_dir( + puffin_file_name, + dir_key, + Box::new(|_| async { Ok(0) }.boxed()), + ) + .await + .unwrap(); + + for (rel_path, content) in &files_in_dir { + let file_path = guard_1.path().join(rel_path); + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, *content); + } + + // Still hold the guard + manager.cache.run_pending_tasks().await; + assert!(!manager.in_cache(puffin_file_name, dir_key)); + + // Third time to get the directory and all guards are dropped + drop(guard_0); + drop(guard_1); + let guard_2 = manager + .get_dir( + puffin_file_name, + dir_key, + Box::new(|_| Box::pin(async move { Ok(0) })), + ) + .await + .unwrap(); + + // Still hold the guard, so the directory should not be removed even if it's evicted + manager.cache.run_pending_tasks().await; + assert!(!manager.in_cache(puffin_file_name, blob_key)); + + for (rel_path, content) in &files_in_dir { + let file_path = guard_2.path().join(rel_path); + let mut file = tokio::fs::File::open(&file_path).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, *content); + } + } + + #[tokio::test] + async fn test_get_blob_concurrency_on_fail() { + let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_"); + let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + .await + .unwrap(); + + let puffin_file_name = "test_get_blob_concurrency_on_fail"; + let key = "key"; + + let manager = Arc::new(manager); + let handles = (0..10) + .map(|_| { + let manager = manager.clone(); + let task = async move { + let failed_init = Box::new(|_| { + async { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + BlobNotFoundSnafu { blob: "whatever" }.fail() + } + .boxed() + }); + manager.get_blob(puffin_file_name, key, failed_init).await + }; + + tokio::spawn(task) + }) + .collect::>(); + + for handle in handles { + let r = handle.await.unwrap(); + assert!(r.is_err()); + } + + assert!(!manager.in_cache(puffin_file_name, key)); + } + + #[tokio::test] + async fn test_get_dir_concurrency_on_fail() { + let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_"); + let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + .await + .unwrap(); + + let puffin_file_name = "test_get_dir_concurrency_on_fail"; + let key = "key"; + + let manager = Arc::new(manager); + let handles = (0..10) + .map(|_| { + let manager = manager.clone(); + let task = async move { + let failed_init = Box::new(|_| { + async { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + BlobNotFoundSnafu { blob: "whatever" }.fail() + } + .boxed() + }); + manager.get_dir(puffin_file_name, key, failed_init).await + }; + + tokio::spawn(task) + }) + .collect::>(); + + for handle in handles { + let r = handle.await.unwrap(); + assert!(r.is_err()); + } + + assert!(!manager.in_cache(puffin_file_name, key)); + } +} diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs index 323a0675620a..0aad8f25761f 100644 --- a/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::PathBuf; - use async_compression::futures::bufread::ZstdDecoder; use async_trait::async_trait; use futures::future::BoxFuture; @@ -30,25 +28,25 @@ use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; use crate::puffin_manager::cache_manager::{BoxWriter, CacheManagerRef, DirWriterProviderRef}; use crate::puffin_manager::cached_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::file_accessor::PuffinFileAccessorRef; -use crate::puffin_manager::PuffinReader; +use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader}; /// `CachedPuffinReader` is a `PuffinReader` that provides cached readers for puffin files. -pub struct CachedPuffinReader { +pub struct CachedPuffinReader { /// The name of the puffin file. puffin_file_name: String, /// The cache manager. - cache_manager: CacheManagerRef, + cache_manager: CacheManagerRef, /// The puffin file accessor. puffin_file_accessor: PuffinFileAccessorRef, } -impl CachedPuffinReader { +impl CachedPuffinReader { #[allow(unused)] pub(crate) fn new( puffin_file_name: String, - cache_manager: CacheManagerRef, + cache_manager: CacheManagerRef, puffin_file_accessor: PuffinFileAccessorRef, ) -> Self { Self { @@ -60,15 +58,17 @@ impl CachedPuffinReader { } #[async_trait] -impl PuffinReader for CachedPuffinReader +impl PuffinReader for CachedPuffinReader where + B: BlobGuard, + D: DirGuard, AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, AW: AsyncWrite + 'static, - CR: AsyncRead + AsyncSeek, { - type Reader = CR; + type Blob = B; + type Dir = D; - async fn blob(&self, key: &str) -> Result { + async fn blob(&self, key: &str) -> Result { self.cache_manager .get_blob( self.puffin_file_name.as_str(), @@ -83,7 +83,7 @@ where .await } - async fn dir(&self, key: &str) -> Result { + async fn dir(&self, key: &str) -> Result { self.cache_manager .get_dir( self.puffin_file_name.as_str(), @@ -99,11 +99,12 @@ where } } -impl CachedPuffinReader +impl CachedPuffinReader where + B: BlobGuard, + G: DirGuard, AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, AW: AsyncWrite + 'static, - CR: AsyncRead + AsyncSeek, { fn init_blob_to_cache( puffin_file_name: String, diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs index b05e0e78d0d1..2b26e4a3553e 100644 --- a/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use async_compression::futures::bufread::ZstdEncoder; use async_trait::async_trait; use futures::io::BufReader; -use futures::{AsyncRead, AsyncSeek, AsyncWrite, StreamExt}; +use futures::{AsyncRead, AsyncWrite, StreamExt}; use snafu::{ensure, ResultExt}; use tokio_util::compat::TokioAsyncReadCompatExt; use uuid::Uuid; @@ -31,15 +31,15 @@ use crate::error::{ use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; use crate::puffin_manager::cache_manager::CacheManagerRef; use crate::puffin_manager::cached_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata}; -use crate::puffin_manager::{PuffinWriter, PutOptions}; +use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions}; /// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file. -pub struct CachedPuffinWriter { +pub struct CachedPuffinWriter { /// The name of the puffin file. puffin_file_name: String, /// The cache manager. - cache_manager: CacheManagerRef, + cache_manager: CacheManagerRef, /// The underlying `PuffinFileWriter`. puffin_file_writer: PuffinFileWriter, @@ -48,11 +48,11 @@ pub struct CachedPuffinWriter { blob_keys: HashSet, } -impl CachedPuffinWriter { +impl CachedPuffinWriter { #[allow(unused)] pub(crate) fn new( puffin_file_name: String, - cache_manager: CacheManagerRef, + cache_manager: CacheManagerRef, writer: W, ) -> Self { Self { @@ -65,9 +65,10 @@ impl CachedPuffinWriter { } #[async_trait] -impl PuffinWriter for CachedPuffinWriter +impl PuffinWriter for CachedPuffinWriter where - CR: AsyncRead + AsyncSeek, + B: BlobGuard, + D: DirGuard, W: AsyncWrite + Unpin + Send, { async fn put_blob(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result