From 3e2f3e77f6a23c48f2088b85dcd55933925b465c Mon Sep 17 00:00:00 2001 From: Oleksii Vykaliuk Date: Tue, 10 Dec 2024 12:57:33 +0100 Subject: [PATCH] Add per bucket cache for ObjectStore --- Cargo.toml | 1 + crates/core/Cargo.toml | 3 +- crates/core/src/format.rs | 4 +- crates/core/src/object_store.rs | 173 +++++++++++++++++++++++++++----- 4 files changed, 152 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6a291dd7..658acc9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ log = "0.4.22" mime = "0.3.17" mockito = "1.5" object_store = "0.11.0" +once_cell = "1.20.2" openssl = { version = "0.10.68", features = ["vendored"] } openssl-src = "=300.4.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python parquet = { version = "52.2", default-features = false } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index acf643bb..28218856 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store", "dep:tokio"] +object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] @@ -63,6 +63,7 @@ jsonschema = { workspace = true, optional = true } log.workspace = true mime.workspace = true object_store = { workspace = true, optional = true } +once_cell = { workspace = true, optional = true } parquet = { workspace = true, optional = true } reqwest = { workspace = true, features = ["json", "blocking"], optional = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/core/src/format.rs b/crates/core/src/format.rs index 0245bf0d..aae86a7d 100644 --- a/crates/core/src/format.rs +++ b/crates/core/src/format.rs @@ -156,9 +156,7 @@ impl Format { let href = href.into(); match href.realize() { RealizedHref::Url(url) => { - use object_store::ObjectStore; - - let (object_store, path) = crate::parse_url_opts(&url, options)?; + let (object_store, path) = crate::parse_url_opts(&url, options).await?; let get_result = object_store.get(&path).await?; let mut value: T = self.from_bytes(get_result.bytes().await?)?; *value.self_href_mut() = Some(Href::Url(url)); diff --git a/crates/core/src/object_store.rs b/crates/core/src/object_store.rs index 3eb27e10..5f9247f1 100644 --- a/crates/core/src/object_store.rs +++ b/crates/core/src/object_store.rs @@ -1,9 +1,45 @@ -use object_store::{ - local::LocalFileSystem, memory::InMemory, path::Path, DynObjectStore, ObjectStoreScheme, -}; +use std::{collections::HashMap, sync::Arc}; + +use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme}; +use once_cell::sync::Lazy; +use tokio::sync::RwLock; use url::Url; -#[cfg(feature = "object-store")] +static OBJECT_STORE_CACHE: Lazy>>> = + Lazy::new(Default::default); + +/// Parameter set to identify and cache an object Storage +#[derive(PartialEq, Eq, Hash)] +struct ObjectStoreIdentifier { + /// A base url to the bucket. + // should be enough to identify cloud provider and bucket + base_url: Url, + + /// Object Store options + options: Vec<(String, String)>, +} + +impl ObjectStoreIdentifier { + fn new(base_url: Url, options: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: Into, + { + Self { + base_url, + options: options + .into_iter() + .map(|(k, v)| (k.as_ref().into(), v.into())) + .collect(), + } + } + + fn get_options(&self) -> Vec<(String, String)> { + self.options.to_owned() + } +} + macro_rules! builder_env_opts { ($builder:ty, $url:expr, $options:expr) => {{ let builder = $options.into_iter().fold( @@ -13,54 +49,45 @@ macro_rules! builder_env_opts { Err(_) => builder, }, ); - Box::new(builder.build()?) + Arc::new(builder.build()?) }}; } -/// Modified version of object_store::parse_url_opts that also parses env -/// -/// It does the same, except we start from env vars, then apply url and then overrides from options -/// -/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching -pub fn parse_url_opts( +fn create_object_store( + scheme: ObjectStoreScheme, url: &Url, options: I, -) -> Result<(Box, Path), object_store::Error> +) -> Result, object_store::Error> where I: IntoIterator, K: AsRef, V: Into, { - let _options = options; - let (scheme, path) = ObjectStoreScheme::parse(url)?; - let path = Path::parse(path)?; - - let store: Box = match scheme { - ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()), - ObjectStoreScheme::Memory => Box::new(InMemory::new()), + let store: Arc = match scheme { + ObjectStoreScheme::Local => Arc::new(LocalFileSystem::new()), #[cfg(feature = "object-store-aws")] ObjectStoreScheme::AmazonS3 => { - builder_env_opts!(object_store::aws::AmazonS3Builder, url, _options) + builder_env_opts!(object_store::aws::AmazonS3Builder, url, options) } #[cfg(feature = "object-store-gcp")] ObjectStoreScheme::GoogleCloudStorage => { - builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, _options) + builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, options) } #[cfg(feature = "object-store-azure")] ObjectStoreScheme::MicrosoftAzure => { - builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, _options) + builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, options) } #[cfg(feature = "object-store-http")] ObjectStoreScheme::Http => { let url = &url[..url::Position::BeforePath]; - let builder = _options.into_iter().fold( + let builder = options.into_iter().fold( object_store::http::HttpBuilder::new().with_url(url.to_string()), |builder, (key, value)| match key.as_ref().parse() { Ok(k) => builder.with_config(k, value), Err(_) => builder, }, ); - Box::new(builder.build()?) + Arc::new(builder.build()?) } s => { return Err(object_store::Error::Generic { @@ -69,5 +96,101 @@ where }) } }; - Ok((store, path)) + Ok(store) +} + +/// Modified version of object_store::parse_url_opts that also parses env +/// +/// It does the same, except we start from env vars, then apply url and then overrides from options +/// +/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching +pub async fn parse_url_opts( + url: &Url, + options: I, +) -> Result<(Arc, Path), crate::Error> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + // TODO: Handle error properly + let (scheme, path) = ObjectStoreScheme::parse(url).unwrap(); + + let path_string: String = path.clone().into(); + let path_str = path_string.as_str(); + // TODO: Handle error properly + let base_url = url[..] + .strip_suffix(path_str) + .unwrap_or_default() + .try_into() + .unwrap(); + + let object_store_id = ObjectStoreIdentifier::new(base_url, options); + let options = object_store_id.get_options(); + + { + let cache = OBJECT_STORE_CACHE.read().await; + if let Some(store) = cache.get(&object_store_id) { + return Ok((store.clone(), path)); + } + } + + let store = create_object_store(scheme, url, options)?; + { + let mut cache = OBJECT_STORE_CACHE.write().await; + + // TODO: Do we need this cache clean? What is a reasonable cache size here? + if cache.len() >= 8 { + cache.clear() + } + _ = cache.insert(object_store_id, store.clone()); + } + + Ok((store.clone(), path)) +} + +#[cfg(test)] +mod tests { + use url::Url; + + use super::*; + + #[tokio::test] + async fn cache_works() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("s3://bucket/item2").unwrap(); + let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap(); + + assert!(Arc::ptr_eq(&store1, &store2)); + } + #[tokio::test] + async fn different_options() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store, _path) = parse_url_opts(&url, options).await.unwrap(); + + let url2 = Url::parse("s3://bucket/item2").unwrap(); + let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let (store2, _path) = parse_url_opts(&url2, options2).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } + #[tokio::test] + async fn different_urls() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("s3://other-bucket/item").unwrap(); + // let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let (store2, _path) = parse_url_opts(&url2, options).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } }