Skip to content

Commit

Permalink
Add per bucket cache for ObjectStore
Browse files Browse the repository at this point in the history
  • Loading branch information
alekzvik committed Dec 21, 2024
1 parent 7d5bc24 commit 3e2f3e7
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ log = "0.4.22"
mime = "0.3.17"
mockito = "1.5"
object_store = "0.11.0"
once_cell = "1.20.2"
openssl = { version = "0.10.68", features = ["vendored"] }
openssl-src = "=300.4.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python
parquet = { version = "52.2", default-features = false }
Expand Down
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ geoparquet-compression = [
"parquet/lz4",
"parquet/zstd",
]
object-store = ["dep:object_store", "dep:tokio"]
object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"]
object-store-aws = ["object-store", "object_store/aws"]
object-store-azure = ["object-store", "object_store/azure"]
object-store-gcp = ["object-store", "object_store/gcp"]
Expand Down Expand Up @@ -63,6 +63,7 @@ jsonschema = { workspace = true, optional = true }
log.workspace = true
mime.workspace = true
object_store = { workspace = true, optional = true }
once_cell = { workspace = true, optional = true }
parquet = { workspace = true, optional = true }
reqwest = { workspace = true, features = ["json", "blocking"], optional = true }
serde = { workspace = true, features = ["derive"] }
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ impl Format {
let href = href.into();
match href.realize() {
RealizedHref::Url(url) => {
use object_store::ObjectStore;

let (object_store, path) = crate::parse_url_opts(&url, options)?;
let (object_store, path) = crate::parse_url_opts(&url, options).await?;
let get_result = object_store.get(&path).await?;
let mut value: T = self.from_bytes(get_result.bytes().await?)?;
*value.self_href_mut() = Some(Href::Url(url));
Expand Down
173 changes: 148 additions & 25 deletions crates/core/src/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,45 @@
use object_store::{
local::LocalFileSystem, memory::InMemory, path::Path, DynObjectStore, ObjectStoreScheme,
};
use std::{collections::HashMap, sync::Arc};

use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme};
use once_cell::sync::Lazy;
use tokio::sync::RwLock;
use url::Url;

#[cfg(feature = "object-store")]
static OBJECT_STORE_CACHE: Lazy<RwLock<HashMap<ObjectStoreIdentifier, Arc<DynObjectStore>>>> =
Lazy::new(Default::default);

/// Parameter set to identify and cache an object Storage
#[derive(PartialEq, Eq, Hash)]
struct ObjectStoreIdentifier {
/// A base url to the bucket.
// should be enough to identify cloud provider and bucket
base_url: Url,

/// Object Store options
options: Vec<(String, String)>,
}

impl ObjectStoreIdentifier {
fn new<I, K, V>(base_url: Url, options: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
Self {
base_url,
options: options
.into_iter()
.map(|(k, v)| (k.as_ref().into(), v.into()))
.collect(),
}
}

fn get_options(&self) -> Vec<(String, String)> {
self.options.to_owned()
}
}

macro_rules! builder_env_opts {
($builder:ty, $url:expr, $options:expr) => {{
let builder = $options.into_iter().fold(
Expand All @@ -13,54 +49,45 @@ macro_rules! builder_env_opts {
Err(_) => builder,
},
);
Box::new(builder.build()?)
Arc::new(builder.build()?)
}};
}

/// Modified version of object_store::parse_url_opts that also parses env
///
/// It does the same, except we start from env vars, then apply url and then overrides from options
///
/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching
pub fn parse_url_opts<I, K, V>(
fn create_object_store<I, K, V>(
scheme: ObjectStoreScheme,
url: &Url,
options: I,
) -> Result<(Box<DynObjectStore>, Path), object_store::Error>
) -> Result<Arc<DynObjectStore>, object_store::Error>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let _options = options;
let (scheme, path) = ObjectStoreScheme::parse(url)?;
let path = Path::parse(path)?;

let store: Box<DynObjectStore> = match scheme {
ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()),
ObjectStoreScheme::Memory => Box::new(InMemory::new()),
let store: Arc<DynObjectStore> = match scheme {
ObjectStoreScheme::Local => Arc::new(LocalFileSystem::new()),
#[cfg(feature = "object-store-aws")]
ObjectStoreScheme::AmazonS3 => {
builder_env_opts!(object_store::aws::AmazonS3Builder, url, _options)
builder_env_opts!(object_store::aws::AmazonS3Builder, url, options)
}
#[cfg(feature = "object-store-gcp")]
ObjectStoreScheme::GoogleCloudStorage => {
builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, _options)
builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, options)
}
#[cfg(feature = "object-store-azure")]
ObjectStoreScheme::MicrosoftAzure => {
builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, _options)
builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, options)
}
#[cfg(feature = "object-store-http")]
ObjectStoreScheme::Http => {
let url = &url[..url::Position::BeforePath];
let builder = _options.into_iter().fold(
let builder = options.into_iter().fold(
object_store::http::HttpBuilder::new().with_url(url.to_string()),
|builder, (key, value)| match key.as_ref().parse() {
Ok(k) => builder.with_config(k, value),
Err(_) => builder,
},
);
Box::new(builder.build()?)
Arc::new(builder.build()?)
}
s => {
return Err(object_store::Error::Generic {
Expand All @@ -69,5 +96,101 @@ where
})
}
};
Ok((store, path))
Ok(store)
}

/// Modified version of object_store::parse_url_opts that also parses env
///
/// It does the same, except we start from env vars, then apply url and then overrides from options
///
/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching
pub async fn parse_url_opts<I, K, V>(
url: &Url,
options: I,
) -> Result<(Arc<DynObjectStore>, Path), crate::Error>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
// TODO: Handle error properly
let (scheme, path) = ObjectStoreScheme::parse(url).unwrap();

let path_string: String = path.clone().into();
let path_str = path_string.as_str();
// TODO: Handle error properly
let base_url = url[..]
.strip_suffix(path_str)
.unwrap_or_default()
.try_into()
.unwrap();

let object_store_id = ObjectStoreIdentifier::new(base_url, options);
let options = object_store_id.get_options();

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some(store) = cache.get(&object_store_id) {
return Ok((store.clone(), path));
}
}

let store = create_object_store(scheme, url, options)?;
{
let mut cache = OBJECT_STORE_CACHE.write().await;

// TODO: Do we need this cache clean? What is a reasonable cache size here?
if cache.len() >= 8 {
cache.clear()
}
_ = cache.insert(object_store_id, store.clone());
}

Ok((store.clone(), path))
}

#[cfg(test)]
mod tests {
use url::Url;

use super::*;

#[tokio::test]
async fn cache_works() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("s3://bucket/item2").unwrap();
let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap();

assert!(Arc::ptr_eq(&store1, &store2));
}
#[tokio::test]
async fn different_options() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store, _path) = parse_url_opts(&url, options).await.unwrap();

let url2 = Url::parse("s3://bucket/item2").unwrap();
let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
let (store2, _path) = parse_url_opts(&url2, options2).await.unwrap();

assert!(!Arc::ptr_eq(&store, &store2));
}
#[tokio::test]
async fn different_urls() {
let url = Url::parse("s3://bucket/item").unwrap();
let options: Vec<(String, String)> = Vec::new();

let (store, _path) = parse_url_opts(&url, options.clone()).await.unwrap();

let url2 = Url::parse("s3://other-bucket/item").unwrap();
// let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
let (store2, _path) = parse_url_opts(&url2, options).await.unwrap();

assert!(!Arc::ptr_eq(&store, &store2));
}
}

0 comments on commit 3e2f3e7

Please sign in to comment.