Skip to content

Commit

Permalink
feat(puffin): implement MokaCacheManager (#4211)
Browse files Browse the repository at this point in the history
* feat(puffin): implement MokaCacheManager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: +1s

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: corner case to get a blob

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: keep dir in used

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: add more tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add doc comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: toml format

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename unreleased_dirs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: refine some comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: handle more cornor cases

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: refine

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: simplify

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: more explanation

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: use recycle bin

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove unnecessary removing

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
  • Loading branch information
zhongzc authored Jul 1, 2024
1 parent f035a7c commit b69b24a
Show file tree
Hide file tree
Showing 8 changed files with 1,053 additions and 35 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/puffin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,24 @@ workspace = true
async-compression = "0.4.11"
async-trait.workspace = true
async-walkdir = "2.0.0"
base64.workspace = true
bitflags.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
derive_builder.workspace = true
futures.workspace = true
lz4_flex = "0.11"
moka.workspace = true
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2 = "0.10.8"
snafu.workspace = true
tokio.workspace = true
tokio-util.workspace = true
uuid.workspace = true

[dev-dependencies]
common-test-util.workspace = true
33 changes: 33 additions & 0 deletions src/puffin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;
use std::io::Error as IoError;
use std::sync::Arc;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
Expand Down Expand Up @@ -80,6 +81,30 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to create"))]
Create {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to rename"))]
Rename {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to remove"))]
Remove {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Error while walking directory"))]
WalkDirError {
#[snafu(source)]
Expand Down Expand Up @@ -220,6 +245,9 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Get value from cache"))]
CacheGet { source: Arc<Error> },
}

impl ErrorExt for Error {
Expand All @@ -235,6 +263,9 @@ impl ErrorExt for Error {
| Close { .. }
| Open { .. }
| Metadata { .. }
| Create { .. }
| Remove { .. }
| Rename { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
Expand All @@ -254,6 +285,8 @@ impl ErrorExt for Error {
}

DuplicateBlob { .. } => StatusCode::InvalidArguments,

CacheGet { source } => source.status_code(),
}
}

Expand Down
28 changes: 24 additions & 4 deletions src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod file_accessor;
use std::path::PathBuf;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek};

use crate::blob_metadata::CompressionCodec;
Expand Down Expand Up @@ -69,12 +70,31 @@ pub struct PutOptions {
/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
pub trait PuffinReader {
type Reader: AsyncRead + AsyncSeek;
type Blob: BlobGuard;
type Dir: DirGuard;

/// Reads a blob from the Puffin file.
async fn blob(&self, key: &str) -> Result<Self::Reader>;
///
/// The returned `BlobGuard` is used to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<Self::Blob>;

/// Reads a directory from the Puffin file.
/// The returned `PathBuf` is used to access the directory in the filesystem.
async fn dir(&self, key: &str) -> Result<PathBuf>;
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn dir(&self, key: &str) -> Result<Self::Dir>;
}

/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
}

/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}
28 changes: 19 additions & 9 deletions src/puffin/src/puffin_manager/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod moka_cache_manager;

use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use futures::AsyncWrite;

use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};

pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;

Expand All @@ -39,34 +42,41 @@ pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
///
/// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the cache.
pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult;

/// Function that initializes a directory.
///
/// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the cache.
pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;

/// `CacheManager` manages the cache for the puffin files.
#[async_trait]
pub trait CacheManager {
type Reader: AsyncRead + AsyncSeek;
type Blob: BlobGuard;
type Dir: DirGuard;

/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
///
/// The returned `BlobGuard` is used to access the blob reader.
/// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_factory: Box<dyn InitBlobFn + Send + 'a>,
) -> Result<Self::Reader>;
init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob>;

/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_fn: Box<dyn InitDirFn + Send + 'a>,
) -> Result<PathBuf>;
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;

/// Stores a directory in the cache.
async fn put_dir(
Expand All @@ -78,4 +88,4 @@ pub trait CacheManager {
) -> Result<()>;
}

pub type CacheManagerRef<R> = Arc<dyn CacheManager<Reader = R> + Send + Sync>;
pub type CacheManagerRef<B, D> = Arc<dyn CacheManager<Blob = B, Dir = D> + Send + Sync>;
Loading

0 comments on commit b69b24a

Please sign in to comment.