From 7d5bc243862c3089a678c749dc17f894224bca94 Mon Sep 17 00:00:00 2001 From: Oleksii Vykaliuk Date: Thu, 5 Dec 2024 00:01:27 +0100 Subject: [PATCH 1/4] feat: read cloud creds for obstore from env --- crates/core/src/format.rs | 2 +- crates/core/src/lib.rs | 5 +++ crates/core/src/object_store.rs | 73 +++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 crates/core/src/object_store.rs diff --git a/crates/core/src/format.rs b/crates/core/src/format.rs index 9c3ff4b8..0245bf0d 100644 --- a/crates/core/src/format.rs +++ b/crates/core/src/format.rs @@ -158,7 +158,7 @@ impl Format { RealizedHref::Url(url) => { use object_store::ObjectStore; - let (object_store, path) = object_store::parse_url_opts(&url, options)?; + let (object_store, path) = crate::parse_url_opts(&url, options)?; let get_result = object_store.get(&path).await?; let mut value: T = self.from_bytes(get_result.bytes().await?)?; *value.self_href_mut() = Some(Href::Url(url)); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 6bea547d..101abc43 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -178,6 +178,8 @@ pub mod mime; mod ndjson; mod node; #[cfg(feature = "object-store")] +mod object_store; +#[cfg(feature = "object-store")] mod resolver; mod statistics; #[cfg(feature = "validate")] @@ -187,6 +189,9 @@ mod version; use std::fmt::Display; +#[cfg(feature = "object-store")] +pub use object_store::parse_url_opts; + #[cfg(feature = "object-store")] pub use resolver::Resolver; #[cfg(feature = "validate-blocking")] diff --git a/crates/core/src/object_store.rs b/crates/core/src/object_store.rs new file mode 100644 index 00000000..3eb27e10 --- /dev/null +++ b/crates/core/src/object_store.rs @@ -0,0 +1,73 @@ +use object_store::{ + local::LocalFileSystem, memory::InMemory, path::Path, DynObjectStore, ObjectStoreScheme, +}; +use url::Url; + +#[cfg(feature = "object-store")] +macro_rules! builder_env_opts { + ($builder:ty, $url:expr, $options:expr) => {{ + let builder = $options.into_iter().fold( + <$builder>::from_env().with_url($url.to_string()), + |builder, (key, value)| match key.as_ref().parse() { + Ok(k) => builder.with_config(k, value), + Err(_) => builder, + }, + ); + Box::new(builder.build()?) + }}; +} + +/// Modified version of object_store::parse_url_opts that also parses env +/// +/// It does the same, except we start from env vars, then apply url and then overrides from options +/// +/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching +pub fn parse_url_opts( + url: &Url, + options: I, +) -> Result<(Box, Path), object_store::Error> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + let _options = options; + let (scheme, path) = ObjectStoreScheme::parse(url)?; + let path = Path::parse(path)?; + + let store: Box = match scheme { + ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()), + ObjectStoreScheme::Memory => Box::new(InMemory::new()), + #[cfg(feature = "object-store-aws")] + ObjectStoreScheme::AmazonS3 => { + builder_env_opts!(object_store::aws::AmazonS3Builder, url, _options) + } + #[cfg(feature = "object-store-gcp")] + ObjectStoreScheme::GoogleCloudStorage => { + builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, _options) + } + #[cfg(feature = "object-store-azure")] + ObjectStoreScheme::MicrosoftAzure => { + builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, _options) + } + #[cfg(feature = "object-store-http")] + ObjectStoreScheme::Http => { + let url = &url[..url::Position::BeforePath]; + let builder = _options.into_iter().fold( + object_store::http::HttpBuilder::new().with_url(url.to_string()), + |builder, (key, value)| match key.as_ref().parse() { + Ok(k) => builder.with_config(k, value), + Err(_) => builder, + }, + ); + Box::new(builder.build()?) + } + s => { + return Err(object_store::Error::Generic { + store: "parse_url", + source: format!("feature for {s:?} not enabled").into(), + }) + } + }; + Ok((store, path)) +} From 3e2f3e77f6a23c48f2088b85dcd55933925b465c Mon Sep 17 00:00:00 2001 From: Oleksii Vykaliuk Date: Tue, 10 Dec 2024 12:57:33 +0100 Subject: [PATCH 2/4] Add per bucket cache for ObjectStore --- Cargo.toml | 1 + crates/core/Cargo.toml | 3 +- crates/core/src/format.rs | 4 +- crates/core/src/object_store.rs | 173 +++++++++++++++++++++++++++----- 4 files changed, 152 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6a291dd7..658acc9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ log = "0.4.22" mime = "0.3.17" mockito = "1.5" object_store = "0.11.0" +once_cell = "1.20.2" openssl = { version = "0.10.68", features = ["vendored"] } openssl-src = "=300.4.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python parquet = { version = "52.2", default-features = false } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index acf643bb..28218856 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store", "dep:tokio"] +object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] @@ -63,6 +63,7 @@ jsonschema = { workspace = true, optional = true } log.workspace = true mime.workspace = true object_store = { workspace = true, optional = true } +once_cell = { workspace = true, optional = true } parquet = { workspace = true, optional = true } reqwest = { workspace = true, features = ["json", "blocking"], optional = true } serde = { workspace = true, features = ["derive"] } diff --git a/crates/core/src/format.rs b/crates/core/src/format.rs index 0245bf0d..aae86a7d 100644 --- a/crates/core/src/format.rs +++ b/crates/core/src/format.rs @@ -156,9 +156,7 @@ impl Format { let href = href.into(); match href.realize() { RealizedHref::Url(url) => { - use object_store::ObjectStore; - - let (object_store, path) = crate::parse_url_opts(&url, options)?; + let (object_store, path) = crate::parse_url_opts(&url, options).await?; let get_result = object_store.get(&path).await?; let mut value: T = self.from_bytes(get_result.bytes().await?)?; *value.self_href_mut() = Some(Href::Url(url)); diff --git a/crates/core/src/object_store.rs b/crates/core/src/object_store.rs index 3eb27e10..5f9247f1 100644 --- a/crates/core/src/object_store.rs +++ b/crates/core/src/object_store.rs @@ -1,9 +1,45 @@ -use object_store::{ - local::LocalFileSystem, memory::InMemory, path::Path, DynObjectStore, ObjectStoreScheme, -}; +use std::{collections::HashMap, sync::Arc}; + +use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme}; +use once_cell::sync::Lazy; +use tokio::sync::RwLock; use url::Url; -#[cfg(feature = "object-store")] +static OBJECT_STORE_CACHE: Lazy>>> = + Lazy::new(Default::default); + +/// Parameter set to identify and cache an object Storage +#[derive(PartialEq, Eq, Hash)] +struct ObjectStoreIdentifier { + /// A base url to the bucket. + // should be enough to identify cloud provider and bucket + base_url: Url, + + /// Object Store options + options: Vec<(String, String)>, +} + +impl ObjectStoreIdentifier { + fn new(base_url: Url, options: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: Into, + { + Self { + base_url, + options: options + .into_iter() + .map(|(k, v)| (k.as_ref().into(), v.into())) + .collect(), + } + } + + fn get_options(&self) -> Vec<(String, String)> { + self.options.to_owned() + } +} + macro_rules! builder_env_opts { ($builder:ty, $url:expr, $options:expr) => {{ let builder = $options.into_iter().fold( @@ -13,54 +49,45 @@ macro_rules! builder_env_opts { Err(_) => builder, }, ); - Box::new(builder.build()?) + Arc::new(builder.build()?) }}; } -/// Modified version of object_store::parse_url_opts that also parses env -/// -/// It does the same, except we start from env vars, then apply url and then overrides from options -/// -/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching -pub fn parse_url_opts( +fn create_object_store( + scheme: ObjectStoreScheme, url: &Url, options: I, -) -> Result<(Box, Path), object_store::Error> +) -> Result, object_store::Error> where I: IntoIterator, K: AsRef, V: Into, { - let _options = options; - let (scheme, path) = ObjectStoreScheme::parse(url)?; - let path = Path::parse(path)?; - - let store: Box = match scheme { - ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()), - ObjectStoreScheme::Memory => Box::new(InMemory::new()), + let store: Arc = match scheme { + ObjectStoreScheme::Local => Arc::new(LocalFileSystem::new()), #[cfg(feature = "object-store-aws")] ObjectStoreScheme::AmazonS3 => { - builder_env_opts!(object_store::aws::AmazonS3Builder, url, _options) + builder_env_opts!(object_store::aws::AmazonS3Builder, url, options) } #[cfg(feature = "object-store-gcp")] ObjectStoreScheme::GoogleCloudStorage => { - builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, _options) + builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, options) } #[cfg(feature = "object-store-azure")] ObjectStoreScheme::MicrosoftAzure => { - builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, _options) + builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, options) } #[cfg(feature = "object-store-http")] ObjectStoreScheme::Http => { let url = &url[..url::Position::BeforePath]; - let builder = _options.into_iter().fold( + let builder = options.into_iter().fold( object_store::http::HttpBuilder::new().with_url(url.to_string()), |builder, (key, value)| match key.as_ref().parse() { Ok(k) => builder.with_config(k, value), Err(_) => builder, }, ); - Box::new(builder.build()?) + Arc::new(builder.build()?) } s => { return Err(object_store::Error::Generic { @@ -69,5 +96,101 @@ where }) } }; - Ok((store, path)) + Ok(store) +} + +/// Modified version of object_store::parse_url_opts that also parses env +/// +/// It does the same, except we start from env vars, then apply url and then overrides from options +/// +/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching +pub async fn parse_url_opts( + url: &Url, + options: I, +) -> Result<(Arc, Path), crate::Error> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + // TODO: Handle error properly + let (scheme, path) = ObjectStoreScheme::parse(url).unwrap(); + + let path_string: String = path.clone().into(); + let path_str = path_string.as_str(); + // TODO: Handle error properly + let base_url = url[..] + .strip_suffix(path_str) + .unwrap_or_default() + .try_into() + .unwrap(); + + let object_store_id = ObjectStoreIdentifier::new(base_url, options); + let options = object_store_id.get_options(); + + { + let cache = OBJECT_STORE_CACHE.read().await; + if let Some(store) = cache.get(&object_store_id) { + return Ok((store.clone(), path)); + } + } + + let store = create_object_store(scheme, url, options)?; + { + let mut cache = OBJECT_STORE_CACHE.write().await; + + // TODO: Do we need this cache clean? What is a reasonable cache size here? + if cache.len() >= 8 { + cache.clear() + } + _ = cache.insert(object_store_id, store.clone()); + } + + Ok((store.clone(), path)) +} + +#[cfg(test)] +mod tests { + use url::Url; + + use super::*; + + #[tokio::test] + async fn cache_works() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("s3://bucket/item2").unwrap(); + let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap(); + + assert!(Arc::ptr_eq(&store1, &store2)); + } + #[tokio::test] + async fn different_options() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store, _path) = parse_url_opts(&url, options).await.unwrap(); + + let url2 = Url::parse("s3://bucket/item2").unwrap(); + let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let (store2, _path) = parse_url_opts(&url2, options2).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } + #[tokio::test] + async fn different_urls() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("s3://other-bucket/item").unwrap(); + // let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let (store2, _path) = parse_url_opts(&url2, options).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } } From a0a4a8705ff81331c5a657b642bd0e6dddc3681f Mon Sep 17 00:00:00 2001 From: Oleksii Vykaliuk Date: Sat, 21 Dec 2024 23:54:32 +0100 Subject: [PATCH 3/4] feat: new crate stac-object-store-cache and import from it in core --- Cargo.toml | 2 + crates/core/Cargo.toml | 4 +- crates/core/src/error.rs | 5 + crates/core/src/format.rs | 8 +- crates/core/src/lib.rs | 5 - crates/object-store-cache/Cargo.toml | 35 ++++++ crates/object-store-cache/README.md | 0 .../src/cache.rs} | 117 +++++++++++++----- crates/object-store-cache/src/error.rs | 26 ++++ crates/object-store-cache/src/lib.rs | 16 +++ 10 files changed, 175 insertions(+), 43 deletions(-) create mode 100644 crates/object-store-cache/Cargo.toml create mode 100644 crates/object-store-cache/README.md rename crates/{core/src/object_store.rs => object-store-cache/src/cache.rs} (61%) create mode 100644 crates/object-store-cache/src/error.rs create mode 100644 crates/object-store-cache/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 658acc9b..bbf1e8a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/derive", "crates/duckdb", "crates/extensions", + "crates/object-store-cache", "crates/pgstac", "crates/server", ] @@ -76,6 +77,7 @@ stac-api = { version = "0.6.2", path = "crates/api" } stac-derive = { version = "0.1.0", path = "crates/derive" } stac-duckdb = { version = "0.0.3", path = "crates/duckdb" } stac-server = { version = "0.3.2", path = "crates/server" } +stac-object-store-cache = { version = "0.1.0", path = "crates/object-store-cache" } syn = "2.0" tempfile = "3.13" thiserror = "2.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 28218856..ff479e1b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"] +object-store = ["dep:object_store", "dep:tokio", "dep:stac-object-store-cache"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] @@ -63,12 +63,12 @@ jsonschema = { workspace = true, optional = true } log.workspace = true mime.workspace = true object_store = { workspace = true, optional = true } -once_cell = { workspace = true, optional = true } parquet = { workspace = true, optional = true } reqwest = { workspace = true, features = ["json", "blocking"], optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = ["preserve_order"] } stac-derive.workspace = true +stac-object-store-cache = { workspace = true, optional = true } thiserror.workspace = true tokio = { workspace = true, optional = true } tracing.workspace = true diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index e597618f..633db317 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -86,6 +86,11 @@ pub enum Error { #[error("json value is not an object")] NotAnObject(serde_json::Value), + /// [stac-object-store-cache::Error] + #[error(transparent)] + #[cfg(feature = "object-store")] + ObjectStoreCache(#[from] stac_object_store_cache::Error), + /// [object_store::Error] #[error(transparent)] #[cfg(feature = "object-store")] diff --git a/crates/core/src/format.rs b/crates/core/src/format.rs index aae86a7d..87703d88 100644 --- a/crates/core/src/format.rs +++ b/crates/core/src/format.rs @@ -156,7 +156,8 @@ impl Format { let href = href.into(); match href.realize() { RealizedHref::Url(url) => { - let (object_store, path) = crate::parse_url_opts(&url, options).await?; + let (object_store, path) = + stac_object_store_cache::parse_url_opts(&url, options).await?; let get_result = object_store.get(&path).await?; let mut value: T = self.from_bytes(get_result.bytes().await?)?; *value.self_href_mut() = Some(Href::Url(url)); @@ -235,9 +236,8 @@ impl Format { { let href = href.to_string(); if let Ok(url) = url::Url::parse(&href) { - use object_store::ObjectStore; - - let (object_store, path) = object_store::parse_url_opts(&url, options)?; + let (object_store, path) = + stac_object_store_cache::parse_url_opts(&url, options).await?; let bytes = self.into_vec(value)?; let put_result = object_store.put(&path, bytes.into()).await?; Ok(Some(put_result)) diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 101abc43..6bea547d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -178,8 +178,6 @@ pub mod mime; mod ndjson; mod node; #[cfg(feature = "object-store")] -mod object_store; -#[cfg(feature = "object-store")] mod resolver; mod statistics; #[cfg(feature = "validate")] @@ -189,9 +187,6 @@ mod version; use std::fmt::Display; -#[cfg(feature = "object-store")] -pub use object_store::parse_url_opts; - #[cfg(feature = "object-store")] pub use resolver::Resolver; #[cfg(feature = "validate-blocking")] diff --git a/crates/object-store-cache/Cargo.toml b/crates/object-store-cache/Cargo.toml new file mode 100644 index 00000000..3ef4816e --- /dev/null +++ b/crates/object-store-cache/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "stac-object-store-cache" +description = "Create and cache object stores based on url in stac." +version = "0.1.0" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +categories.workspace = true +rust-version.workspace = true + +[features] +object-store-aws = ["object_store/aws"] +object-store-azure = ["object_store/azure"] +object-store-gcp = ["object_store/gcp"] +object-store-http = ["object_store/http"] +object-store-all = [ + "object-store-aws", + "object-store-azure", + "object-store-gcp", + "object-store-http", +] + + +[dependencies] +object_store = { workspace = true } +once_cell = { workspace = true } +thiserror.workspace = true +tokio = { workspace = true } +url = { workspace = true, features = ["serde"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros"] } +tokio-test.workspace = true diff --git a/crates/object-store-cache/README.md b/crates/object-store-cache/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/core/src/object_store.rs b/crates/object-store-cache/src/cache.rs similarity index 61% rename from crates/core/src/object_store.rs rename to crates/object-store-cache/src/cache.rs index 5f9247f1..5bfa23b2 100644 --- a/crates/core/src/object_store.rs +++ b/crates/object-store-cache/src/cache.rs @@ -1,18 +1,23 @@ use std::{collections::HashMap, sync::Arc}; +use crate::{Error, Result}; + use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme}; use once_cell::sync::Lazy; use tokio::sync::RwLock; use url::Url; +// To avoid memory leaks, we clear the cache when it grows too big. +// The value does not have any meaning, other than polars use the same. +const CACHE_SIZE: usize = 8; + static OBJECT_STORE_CACHE: Lazy>>> = Lazy::new(Default::default); -/// Parameter set to identify and cache an object Storage -#[derive(PartialEq, Eq, Hash)] +/// Parameter set to identify and cache an object store +#[derive(PartialEq, Eq, Hash, Debug)] struct ObjectStoreIdentifier { /// A base url to the bucket. - // should be enough to identify cloud provider and bucket base_url: Url, /// Object Store options @@ -40,6 +45,11 @@ impl ObjectStoreIdentifier { } } +#[cfg(any( + feature = "object-store-aws", + feature = "object-store-gcp", + feature = "object-store-azure" +))] macro_rules! builder_env_opts { ($builder:ty, $url:expr, $options:expr) => {{ let builder = $options.into_iter().fold( @@ -53,11 +63,24 @@ macro_rules! builder_env_opts { }}; } +/// This was yanked from [object_store::parse_url_opts] with the following changes: +/// +/// - Build [object_store::ObjectStore] with environment variables +/// - Return [Arc] instead of [Box] +#[cfg_attr( + not(any( + feature = "object-store-aws", + feature = "object-store-gcp", + feature = "object-store-azure", + feature = "object-store-http" + )), + allow(unused_variables) +)] fn create_object_store( scheme: ObjectStoreScheme, url: &Url, options: I, -) -> Result, object_store::Error> +) -> Result> where I: IntoIterator, K: AsRef, @@ -89,61 +112,49 @@ where ); Arc::new(builder.build()?) } - s => { - return Err(object_store::Error::Generic { - store: "parse_url", - source: format!("feature for {s:?} not enabled").into(), - }) - } + s => return Err(Error::ObjectStoreCreate { scheme: s }), }; Ok(store) } -/// Modified version of object_store::parse_url_opts that also parses env +/// Drop-in replacement for [object_store::parse_url_opts] with caching and env vars. /// -/// It does the same, except we start from env vars, then apply url and then overrides from options -/// -/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching +/// It will create or retrieve object store based on passed `url` and `options`. +/// Keeps global cache pub async fn parse_url_opts( url: &Url, options: I, -) -> Result<(Arc, Path), crate::Error> +) -> crate::Result<(Arc, Path)> where I: IntoIterator, K: AsRef, V: Into, { - // TODO: Handle error properly - let (scheme, path) = ObjectStoreScheme::parse(url).unwrap(); - - let path_string: String = path.clone().into(); - let path_str = path_string.as_str(); - // TODO: Handle error properly - let base_url = url[..] - .strip_suffix(path_str) + let (scheme, path) = ObjectStoreScheme::parse(url).map_err(object_store::Error::from)?; + + let base_url = url + .as_ref() + .strip_suffix(path.as_ref()) .unwrap_or_default() - .try_into() - .unwrap(); + .try_into()?; let object_store_id = ObjectStoreIdentifier::new(base_url, options); let options = object_store_id.get_options(); { let cache = OBJECT_STORE_CACHE.read().await; - if let Some(store) = cache.get(&object_store_id) { + if let Some(store) = (*cache).get(&object_store_id) { return Ok((store.clone(), path)); } } - let store = create_object_store(scheme, url, options)?; { let mut cache = OBJECT_STORE_CACHE.write().await; - // TODO: Do we need this cache clean? What is a reasonable cache size here? - if cache.len() >= 8 { - cache.clear() + if cache.len() >= CACHE_SIZE { + (*cache).clear() } - _ = cache.insert(object_store_id, store.clone()); + _ = (*cache).insert(object_store_id, store.clone()); } Ok((store.clone(), path)) @@ -155,18 +166,58 @@ mod tests { use super::*; + #[tokio::test] + async fn file_different_path() { + let options: Vec<(String, String)> = Vec::new(); + + let url = Url::parse("file:///some/path").unwrap(); + let (store, path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("file:///other/path").unwrap(); + let (store2, _) = parse_url_opts(&url2, options.clone()).await.unwrap(); + + { + let cache = OBJECT_STORE_CACHE.read().await; + println!("{cache:#?}") + } + + assert!(Arc::ptr_eq(&store, &store2)); + assert!(std::ptr::addr_eq(Arc::as_ptr(&store), Arc::as_ptr(&store2))); + // assert_eq!(store.as_ref(), store2.as_ref()); + // assert_eq!(Arc::as_ptr(&store), Arc::as_ptr(&store2)); + assert_eq!(path.as_ref(), "some/path"); + } + + #[tokio::test] + async fn file_different_options() { + let options: Vec<(String, String)> = Vec::new(); + + let url = Url::parse("file:///some/path").unwrap(); + let (store, _) = parse_url_opts(&url, options).await.unwrap(); + + let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let url2 = Url::parse("file:///some/path").unwrap(); + let (store2, _) = parse_url_opts(&url2, options2).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } + + #[cfg(feature = "object-store-aws")] #[tokio::test] async fn cache_works() { let url = Url::parse("s3://bucket/item").unwrap(); let options: Vec<(String, String)> = Vec::new(); - let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + let (store1, path) = parse_url_opts(&url, options.clone()).await.unwrap(); let url2 = Url::parse("s3://bucket/item2").unwrap(); let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap(); assert!(Arc::ptr_eq(&store1, &store2)); + assert_eq!(path.as_ref(), "item"); } + + #[cfg(feature = "object-store-aws")] #[tokio::test] async fn different_options() { let url = Url::parse("s3://bucket/item").unwrap(); @@ -180,6 +231,8 @@ mod tests { assert!(!Arc::ptr_eq(&store, &store2)); } + + #[cfg(feature = "object-store-aws")] #[tokio::test] async fn different_urls() { let url = Url::parse("s3://bucket/item").unwrap(); diff --git a/crates/object-store-cache/src/error.rs b/crates/object-store-cache/src/error.rs new file mode 100644 index 00000000..d71f547e --- /dev/null +++ b/crates/object-store-cache/src/error.rs @@ -0,0 +1,26 @@ +use object_store::ObjectStoreScheme; +use thiserror::Error; + +/// Error enum for crate-specific errors. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum Error { + /// TODO: Better error description + #[error("Failed to create object_store for {scheme:?}. Check if required feature is enabled.")] + ObjectStoreCreate { + /// feature + scheme: ObjectStoreScheme, + }, + + /// [url::ParseError] + #[error(transparent)] + UrlParse(#[from] url::ParseError), + + /// [object_store::Error] + #[error(transparent)] + ObjectStore(#[from] object_store::Error), + + /// [object_store::path::Error] + #[error(transparent)] + ObjectStorePath(#[from] object_store::path::Error), +} diff --git a/crates/object-store-cache/src/lib.rs b/crates/object-store-cache/src/lib.rs new file mode 100644 index 00000000..ebe3c379 --- /dev/null +++ b/crates/object-store-cache/src/lib.rs @@ -0,0 +1,16 @@ +//! Work with [ObjectStore](object_store::ObjectStore) in STAC. +//! +//! Features: +//! - cache used objects_stores based on url and options +//! - read cloud creadentials from env +//! + +mod cache; +mod error; + +pub use cache::parse_url_opts; + +pub use error::Error; + +/// Custom [Result](std::result::Result) type for this crate. +pub type Result = std::result::Result; From fe6a4a5897773c68d459eaba4f648b4241ecc6b5 Mon Sep 17 00:00:00 2001 From: Oleksii Vykaliuk Date: Sun, 22 Dec 2024 00:15:56 +0100 Subject: [PATCH 4/4] ci: add object-store-cache crate to ci --- .github/workflows/ci.yml | 8 ++++++++ crates/object-store-cache/src/cache.rs | 2 -- crates/object-store-cache/src/error.rs | 1 - 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 823e78cf..bade9b4a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,6 +99,14 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: Test run: cargo test -p stac-extensions + test-object-store-cache: + name: Test stac-object-store-cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: Swatinem/rust-cache@v2 + - name: Test + run: cargo test -p stac-object-store-cache --all-features test-pgstac: name: Test pgstac runs-on: ubuntu-latest diff --git a/crates/object-store-cache/src/cache.rs b/crates/object-store-cache/src/cache.rs index 5bfa23b2..e836ccca 100644 --- a/crates/object-store-cache/src/cache.rs +++ b/crates/object-store-cache/src/cache.rs @@ -183,8 +183,6 @@ mod tests { assert!(Arc::ptr_eq(&store, &store2)); assert!(std::ptr::addr_eq(Arc::as_ptr(&store), Arc::as_ptr(&store2))); - // assert_eq!(store.as_ref(), store2.as_ref()); - // assert_eq!(Arc::as_ptr(&store), Arc::as_ptr(&store2)); assert_eq!(path.as_ref(), "some/path"); } diff --git a/crates/object-store-cache/src/error.rs b/crates/object-store-cache/src/error.rs index d71f547e..e4a90ef9 100644 --- a/crates/object-store-cache/src/error.rs +++ b/crates/object-store-cache/src/error.rs @@ -5,7 +5,6 @@ use thiserror::Error; #[derive(Error, Debug)] #[non_exhaustive] pub enum Error { - /// TODO: Better error description #[error("Failed to create object_store for {scheme:?}. Check if required feature is enabled.")] ObjectStoreCreate { /// feature