Skip to content

Commit

Permalink
feat(caching): Adds a new caching client
Browse files Browse the repository at this point in the history
This adds a trait so it can be extendible and provides a default
filesystem implementation. I chose this way of doing it because I didn't
think the hassle of having a `Client<T>` was worth it just to add caching.
This is set up in such a way that you can still use a caching client as a
normal client

Signed-off-by: Taylor Thomas <taylor@cosmonic.com>
  • Loading branch information
thomastaylor312 committed Jun 25, 2024
1 parent 7e2d63a commit d7521d1
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 0 deletions.
65 changes: 65 additions & 0 deletions crates/wasm-pkg-client/src/caching/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! A `Cache` implementation for a filesystem

use std::path::{Path, PathBuf};

use anyhow::Context;
use bytes::Bytes;
use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
use tokio_util::io::{ReaderStream, StreamReader};
use wasm_pkg_common::{digest::ContentDigest, Error};

use super::Cache;

pub struct FileCache {
root: PathBuf,
}

impl FileCache {
pub async fn new(root: impl AsRef<Path>) -> anyhow::Result<Self> {
tokio::fs::create_dir_all(&root)
.await
.context("Unable to create cache directory")?;
Ok(Self {
root: root.as_ref().to_path_buf(),
})
}
}

impl Cache for FileCache {
async fn put_data(
&self,
digest: ContentDigest,
data: BoxStream<'_, Result<Bytes, Error>>,
) -> Result<(), Error> {
let path = self.root.join(digest.to_string());
let mut file = tokio::fs::File::create(&path).await.map_err(|e| {
Error::CacheError(anyhow::anyhow!("Unable to create file for cache {e}"))
})?;
let mut buf =
StreamReader::new(data.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
tokio::io::copy(&mut buf, &mut file)
.await
.map_err(|e| Error::CacheError(e.into()))
.map(|_| ())
}

async fn get_data(
&self,
digest: &ContentDigest,
) -> Result<Option<BoxStream<Result<Bytes, Error>>>, Error> {
let path = self.root.join(digest.to_string());
let exists = tokio::fs::try_exists(&path)
.await
.map_err(|e| Error::CacheError(e.into()))?;
if !exists {
return Ok(None);
}
let file = tokio::fs::File::open(path)
.await
.map_err(|e| Error::CacheError(e.into()))?;

Ok(Some(
ReaderStream::new(file).map_err(Error::IoError).boxed(),
))
}
}
80 changes: 80 additions & 0 deletions crates/wasm-pkg-client/src/caching/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::future::Future;
use std::ops::Deref;

use bytes::Bytes;
use futures_util::stream::BoxStream;
use wasm_pkg_common::{digest::ContentDigest, package::PackageRef, Error};

use crate::{Client, Release};

mod file;

pub use file::FileCache;

/// A trait for a cache of data.
pub trait Cache {
/// Puts the data with the given hash into the cache
fn put_data(
&self,
digest: ContentDigest,
data: BoxStream<Result<Bytes, Error>>,
) -> impl Future<Output = Result<(), Error>> + Send;
/// Gets the data with the given hash from the cache. Returns None if the data is not in the cache.
fn get_data(
&self,
digest: &ContentDigest,
) -> impl Future<Output = Result<Option<BoxStream<Result<Bytes, Error>>>, Error>> + Send;
}

pub struct CachingClient<T> {
pub client: Client,
cache: T,
}

impl<T> AsRef<Client> for CachingClient<T> {
fn as_ref(&self) -> &Client {
&self.client
}
}

impl<T> Deref for CachingClient<T> {
type Target = Client;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl<T: Cache> CachingClient<T> {
pub fn new(client: Client, cache: T) -> Self {
Self { client, cache }
}

/// Returns a [`BoxStream`] of content chunks. If the data is in the cache, it will be returned,
/// otherwise it will be fetched from an upstream registry and then cached. This is the same as
/// [`Client::stream_content`] but named differently to avoid confusion when trying to use this
/// as a normal [`Client`].
pub async fn get_content(
&mut self,
package: &PackageRef,
release: &Release,
) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
if let Some(data) = self.cache.get_data(&release.content_digest).await? {
return Ok(data);
}

let stream = self.client.stream_content(package, release).await?;
self.cache
.put_data(release.content_digest.clone(), stream)
.await?;

self.cache
.get_data(&release.content_digest)
.await?
.ok_or_else(|| {
Error::CacheError(anyhow::anyhow!(
"Cached data was deleted after putting the data in cache"
))
})
}
}
1 change: 1 addition & 0 deletions crates/wasm-pkg-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! # Ok(()) }
//! ```

pub mod caching;
mod loader;
pub mod local;
pub mod oci;
Expand Down
2 changes: 2 additions & 0 deletions crates/wasm-pkg-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub mod registry;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("error interacting with cache: {0}")]
CacheError(#[source] anyhow::Error),
#[error("error reading config file: {0}")]
ConfigFileIoError(#[source] std::io::Error),
#[error("failed to get registry credentials: {0:#}")]
Expand Down

0 comments on commit d7521d1

Please sign in to comment.