diff --git a/Cargo.lock b/Cargo.lock index 43ecccc52..4e526614a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,7 @@ dependencies = [ "bytes", "futures", "object_store", + "once_cell", "regex", "rusoto_core", "thiserror", diff --git a/crates/arroyo-storage/Cargo.toml b/crates/arroyo-storage/Cargo.toml index eafa8cf4a..cbd34d01b 100644 --- a/crates/arroyo-storage/Cargo.toml +++ b/crates/arroyo-storage/Cargo.toml @@ -23,3 +23,4 @@ tokio-util = {version = "0.7.9", features = ["io"]} async-trait = "0.1.73" futures = "0.3.28" webpki = ">=0.22.2" +once_cell = "1.19.0" diff --git a/crates/arroyo-storage/src/lib.rs b/crates/arroyo-storage/src/lib.rs index c8726a088..22c5ddd66 100644 --- a/crates/arroyo-storage/src/lib.rs +++ b/crates/arroyo-storage/src/lib.rs @@ -16,8 +16,13 @@ use object_store::multipart::PartId; use object_store::path::Path; use object_store::{aws::AmazonS3Builder, local::LocalFileSystem, ObjectStore}; use object_store::{CredentialProvider, MultipartId}; +use once_cell::sync::Lazy; use regex::{Captures, Regex}; +use std::time::{Duration, Instant}; use thiserror::Error; +use tokio::sync::RwLock; +use tracing::{debug, trace}; + mod aws; /// A reference-counted reference to a [StorageProvider]. @@ -311,6 +316,18 @@ pub async fn get_current_credentials() -> Result, StorageErro Ok(credentials) } +static OBJECT_STORE_CACHE: Lazy>>>> = + Lazy::new(Default::default); + +struct CacheEntry { + value: T, + inserted_at: Instant, +} + +// The bearer token should last for 3600 seconds, +// but regenerating it every 5 minutes to avoid token expiry +const GCS_CACHE_TTL: Duration = Duration::from_secs(5 * 60); + impl StorageProvider { pub async fn for_url(url: &str) -> Result { Self::for_url_with_options(url, HashMap::new()).await @@ -441,6 +458,45 @@ impl StorageProvider { }) } + async fn get_or_create_object_store( + builder: GoogleCloudStorageBuilder, + bucket: &str, + ) -> Result, StorageError> { + let mut cache = OBJECT_STORE_CACHE.write().await; + + if let Some(entry) = cache.get(bucket) { + if entry.inserted_at.elapsed() < GCS_CACHE_TTL { + trace!( + "Cache hit - using cached object store for bucket {}", + bucket + ); + return Ok(entry.value.clone()); + } else { + debug!( + "Cache expired - constructing new object store for bucket {}", + bucket + ); + } + } else { + debug!( + "Cache miss - constructing new object store for bucket {}", + bucket + ); + } + + let new_store = Arc::new(builder.build().map_err(Into::::into)?); + + cache.insert( + bucket.to_string(), + CacheEntry { + value: new_store.clone(), + inserted_at: Instant::now(), + }, + ); + + Ok(new_store) + } + async fn construct_gcs(config: GCSConfig) -> Result { let mut builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(&config.bucket); @@ -456,9 +512,11 @@ impl StorageProvider { let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket); + let object_store = Self::get_or_create_object_store(builder, &config.bucket).await?; + Ok(Self { config: BackendConfig::GCS(config), - object_store: Arc::new(builder.build().map_err(Into::::into)?), + object_store, object_store_base_url, canonical_url, storage_options: HashMap::new(),