Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(puffin): implement MokaCacheManager #4211

Merged
merged 21 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/puffin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,24 @@ workspace = true
async-compression = "0.4.11"
async-trait.workspace = true
async-walkdir = "2.0.0"
base64.workspace = true
bitflags.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
derive_builder.workspace = true
futures.workspace = true
lz4_flex = "0.11"
moka.workspace = true
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2 = "0.10.8"
snafu.workspace = true
tokio.workspace = true
tokio-util.workspace = true
uuid.workspace = true

[dev-dependencies]
common-test-util.workspace = true
33 changes: 33 additions & 0 deletions src/puffin/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;
use std::io::Error as IoError;
use std::sync::Arc;

use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
Expand Down Expand Up @@ -80,6 +81,30 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to create"))]
Create {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to rename"))]
Rename {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to remove"))]
Remove {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Error while walking directory"))]
WalkDirError {
#[snafu(source)]
Expand Down Expand Up @@ -220,6 +245,9 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Get value from cache"))]
CacheGet { source: Arc<Error> },
}

impl ErrorExt for Error {
Expand All @@ -235,6 +263,9 @@ impl ErrorExt for Error {
| Close { .. }
| Open { .. }
| Metadata { .. }
| Create { .. }
| Remove { .. }
| Rename { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
Expand All @@ -254,6 +285,8 @@ impl ErrorExt for Error {
}

DuplicateBlob { .. } => StatusCode::InvalidArguments,

CacheGet { source } => source.status_code(),
}
}

Expand Down
28 changes: 24 additions & 4 deletions src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod file_accessor;
use std::path::PathBuf;

use async_trait::async_trait;
use futures::future::BoxFuture;
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
use futures::{AsyncRead, AsyncSeek};

use crate::blob_metadata::CompressionCodec;
Expand Down Expand Up @@ -69,12 +70,31 @@ pub struct PutOptions {
/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
pub trait PuffinReader {
type Reader: AsyncRead + AsyncSeek;
type Blob: BlobGuard;
type Dir: DirGuard;

/// Reads a blob from the Puffin file.
async fn blob(&self, key: &str) -> Result<Self::Reader>;
///
/// The returned `BlobGuard` is used to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<Self::Blob>;

/// Reads a directory from the Puffin file.
/// The returned `PathBuf` is used to access the directory in the filesystem.
async fn dir(&self, key: &str) -> Result<PathBuf>;
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn dir(&self, key: &str) -> Result<Self::Dir>;
}

/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
}

/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}
28 changes: 19 additions & 9 deletions src/puffin/src/puffin_manager/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod moka_cache_manager;

use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use futures::AsyncWrite;
zhongzc marked this conversation as resolved.
Show resolved Hide resolved

use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};

pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;

Expand All @@ -39,34 +42,41 @@ pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
///
/// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the cache.
pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult;

/// Function that initializes a directory.
///
/// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the cache.
pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;

/// `CacheManager` manages the cache for the puffin files.
#[async_trait]
pub trait CacheManager {
type Reader: AsyncRead + AsyncSeek;
type Blob: BlobGuard;
type Dir: DirGuard;

/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
///
/// The returned `BlobGuard` is used to access the blob reader.
/// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_factory: Box<dyn InitBlobFn + Send + 'a>,
) -> Result<Self::Reader>;
init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob>;

/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_fn: Box<dyn InitDirFn + Send + 'a>,
) -> Result<PathBuf>;
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;

/// Stores a directory in the cache.
async fn put_dir(
Expand All @@ -78,4 +88,4 @@ pub trait CacheManager {
) -> Result<()>;
}

pub type CacheManagerRef<R> = Arc<dyn CacheManager<Reader = R> + Send + Sync>;
pub type CacheManagerRef<B, D> = Arc<dyn CacheManager<Blob = B, Dir = D> + Send + Sync>;
Loading