Skip to content

Commit

Permalink
Add object store cache for GCS Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamin-awd committed May 23, 2024
1 parent b52093f commit d5d46ee
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/arroyo-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ tokio-util = {version = "0.7.9", features = ["io"]}
async-trait = "0.1.73"
futures = "0.3.28"
webpki = ">=0.22.2"
once_cell = "1.19.0"
60 changes: 59 additions & 1 deletion crates/arroyo-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ use object_store::multipart::PartId;
use object_store::path::Path;
use object_store::{aws::AmazonS3Builder, local::LocalFileSystem, ObjectStore};
use object_store::{CredentialProvider, MultipartId};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, trace};

mod aws;

/// A reference-counted reference to a [StorageProvider].
Expand Down Expand Up @@ -311,6 +316,18 @@ pub async fn get_current_credentials() -> Result<Arc<AwsCredential>, StorageErro
Ok(credentials)
}

static OBJECT_STORE_CACHE: Lazy<RwLock<HashMap<String, CacheEntry<Arc<dyn ObjectStore>>>>> =
Lazy::new(Default::default);

struct CacheEntry<T> {
value: T,
inserted_at: Instant,
}

// The bearer token should last for 3600 seconds,
// but regenerating it every 5 minutes to avoid token expiry
const GCS_CACHE_TTL: Duration = Duration::from_secs(5 * 60);

impl StorageProvider {
pub async fn for_url(url: &str) -> Result<Self, StorageError> {
Self::for_url_with_options(url, HashMap::new()).await
Expand Down Expand Up @@ -441,6 +458,45 @@ impl StorageProvider {
})
}

async fn get_or_create_object_store(
builder: GoogleCloudStorageBuilder,
bucket: &str,
) -> Result<Arc<dyn ObjectStore>, StorageError> {
let mut cache = OBJECT_STORE_CACHE.write().await;

if let Some(entry) = cache.get(bucket) {
if entry.inserted_at.elapsed() < GCS_CACHE_TTL {
trace!(
"Cache hit - using cached object store for bucket {}",
bucket
);
return Ok(entry.value.clone());
} else {
debug!(
"Cache expired - constructing new object store for bucket {}",
bucket
);
}
} else {
debug!(
"Cache miss - constructing new object store for bucket {}",
bucket
);
}

let new_store = Arc::new(builder.build().map_err(Into::<StorageError>::into)?);

cache.insert(
bucket.to_string(),
CacheEntry {
value: new_store.clone(),
inserted_at: Instant::now(),
},
);

Ok(new_store)
}

async fn construct_gcs(config: GCSConfig) -> Result<Self, StorageError> {
let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(&config.bucket);

Expand All @@ -456,9 +512,11 @@ impl StorageProvider {

let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket);

let object_store = Self::get_or_create_object_store(builder, &config.bucket).await?;

Ok(Self {
config: BackendConfig::GCS(config),
object_store: Arc::new(builder.build().map_err(Into::<StorageError>::into)?),
object_store,
object_store_base_url,
canonical_url,
storage_options: HashMap::new(),
Expand Down

0 comments on commit d5d46ee

Please sign in to comment.