diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 823e78cf..bade9b4a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,6 +99,14 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: Test run: cargo test -p stac-extensions + test-object-store-cache: + name: Test stac-object-store-cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: Swatinem/rust-cache@v2 + - name: Test + run: cargo test -p stac-object-store-cache --all-features test-pgstac: name: Test pgstac runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 6a291dd7..bbf1e8a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/derive", "crates/duckdb", "crates/extensions", + "crates/object-store-cache", "crates/pgstac", "crates/server", ] @@ -57,6 +58,7 @@ log = "0.4.22" mime = "0.3.17" mockito = "1.5" object_store = "0.11.0" +once_cell = "1.20.2" openssl = { version = "0.10.68", features = ["vendored"] } openssl-src = "=300.4.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python parquet = { version = "52.2", default-features = false } @@ -75,6 +77,7 @@ stac-api = { version = "0.6.2", path = "crates/api" } stac-derive = { version = "0.1.0", path = "crates/derive" } stac-duckdb = { version = "0.0.3", path = "crates/duckdb" } stac-server = { version = "0.3.2", path = "crates/server" } +stac-object-store-cache = { version = "0.1.0", path = "crates/object-store-cache" } syn = "2.0" tempfile = "3.13" thiserror = "2.0" diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index acf643bb..ff479e1b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store", "dep:tokio"] +object-store = ["dep:object_store", "dep:tokio", "dep:stac-object-store-cache"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] @@ -68,6 +68,7 @@ reqwest = { workspace = true, features = ["json", "blocking"], optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = ["preserve_order"] } stac-derive.workspace = true +stac-object-store-cache = { workspace = true, optional = true } thiserror.workspace = true tokio = { workspace = true, optional = true } tracing.workspace = true diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index e597618f..633db317 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -86,6 +86,11 @@ pub enum Error { #[error("json value is not an object")] NotAnObject(serde_json::Value), + /// [stac-object-store-cache::Error] + #[error(transparent)] + #[cfg(feature = "object-store")] + ObjectStoreCache(#[from] stac_object_store_cache::Error), + /// [object_store::Error] #[error(transparent)] #[cfg(feature = "object-store")] diff --git a/crates/core/src/format.rs b/crates/core/src/format.rs index 9c3ff4b8..87703d88 100644 --- a/crates/core/src/format.rs +++ b/crates/core/src/format.rs @@ -156,9 +156,8 @@ impl Format { let href = href.into(); match href.realize() { RealizedHref::Url(url) => { - use object_store::ObjectStore; - - let (object_store, path) = object_store::parse_url_opts(&url, options)?; + let (object_store, path) = + stac_object_store_cache::parse_url_opts(&url, options).await?; let get_result = object_store.get(&path).await?; let mut value: T = self.from_bytes(get_result.bytes().await?)?; *value.self_href_mut() = Some(Href::Url(url)); @@ -237,9 +236,8 @@ impl Format { { let href = href.to_string(); if let Ok(url) = url::Url::parse(&href) { - use object_store::ObjectStore; - - let (object_store, path) = object_store::parse_url_opts(&url, options)?; + let (object_store, path) = + stac_object_store_cache::parse_url_opts(&url, options).await?; let bytes = self.into_vec(value)?; let put_result = object_store.put(&path, bytes.into()).await?; Ok(Some(put_result)) diff --git a/crates/object-store-cache/Cargo.toml b/crates/object-store-cache/Cargo.toml new file mode 100644 index 00000000..3ef4816e --- /dev/null +++ b/crates/object-store-cache/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "stac-object-store-cache" +description = "Create and cache object stores based on url in stac." +version = "0.1.0" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +categories.workspace = true +rust-version.workspace = true + +[features] +object-store-aws = ["object_store/aws"] +object-store-azure = ["object_store/azure"] +object-store-gcp = ["object_store/gcp"] +object-store-http = ["object_store/http"] +object-store-all = [ + "object-store-aws", + "object-store-azure", + "object-store-gcp", + "object-store-http", +] + + +[dependencies] +object_store = { workspace = true } +once_cell = { workspace = true } +thiserror.workspace = true +tokio = { workspace = true } +url = { workspace = true, features = ["serde"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros"] } +tokio-test.workspace = true diff --git a/crates/object-store-cache/README.md b/crates/object-store-cache/README.md new file mode 100644 index 00000000..e69de29b diff --git a/crates/object-store-cache/src/cache.rs b/crates/object-store-cache/src/cache.rs new file mode 100644 index 00000000..e836ccca --- /dev/null +++ b/crates/object-store-cache/src/cache.rs @@ -0,0 +1,247 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::{Error, Result}; + +use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme}; +use once_cell::sync::Lazy; +use tokio::sync::RwLock; +use url::Url; + +// To avoid memory leaks, we clear the cache when it grows too big. +// The value does not have any meaning, other than polars use the same. +const CACHE_SIZE: usize = 8; + +static OBJECT_STORE_CACHE: Lazy>>> = + Lazy::new(Default::default); + +/// Parameter set to identify and cache an object store +#[derive(PartialEq, Eq, Hash, Debug)] +struct ObjectStoreIdentifier { + /// A base url to the bucket. + base_url: Url, + + /// Object Store options + options: Vec<(String, String)>, +} + +impl ObjectStoreIdentifier { + fn new(base_url: Url, options: I) -> Self + where + I: IntoIterator, + K: AsRef, + V: Into, + { + Self { + base_url, + options: options + .into_iter() + .map(|(k, v)| (k.as_ref().into(), v.into())) + .collect(), + } + } + + fn get_options(&self) -> Vec<(String, String)> { + self.options.to_owned() + } +} + +#[cfg(any( + feature = "object-store-aws", + feature = "object-store-gcp", + feature = "object-store-azure" +))] +macro_rules! builder_env_opts { + ($builder:ty, $url:expr, $options:expr) => {{ + let builder = $options.into_iter().fold( + <$builder>::from_env().with_url($url.to_string()), + |builder, (key, value)| match key.as_ref().parse() { + Ok(k) => builder.with_config(k, value), + Err(_) => builder, + }, + ); + Arc::new(builder.build()?) + }}; +} + +/// This was yanked from [object_store::parse_url_opts] with the following changes: +/// +/// - Build [object_store::ObjectStore] with environment variables +/// - Return [Arc] instead of [Box] +#[cfg_attr( + not(any( + feature = "object-store-aws", + feature = "object-store-gcp", + feature = "object-store-azure", + feature = "object-store-http" + )), + allow(unused_variables) +)] +fn create_object_store( + scheme: ObjectStoreScheme, + url: &Url, + options: I, +) -> Result> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + let store: Arc = match scheme { + ObjectStoreScheme::Local => Arc::new(LocalFileSystem::new()), + #[cfg(feature = "object-store-aws")] + ObjectStoreScheme::AmazonS3 => { + builder_env_opts!(object_store::aws::AmazonS3Builder, url, options) + } + #[cfg(feature = "object-store-gcp")] + ObjectStoreScheme::GoogleCloudStorage => { + builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, options) + } + #[cfg(feature = "object-store-azure")] + ObjectStoreScheme::MicrosoftAzure => { + builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, options) + } + #[cfg(feature = "object-store-http")] + ObjectStoreScheme::Http => { + let url = &url[..url::Position::BeforePath]; + let builder = options.into_iter().fold( + object_store::http::HttpBuilder::new().with_url(url.to_string()), + |builder, (key, value)| match key.as_ref().parse() { + Ok(k) => builder.with_config(k, value), + Err(_) => builder, + }, + ); + Arc::new(builder.build()?) + } + s => return Err(Error::ObjectStoreCreate { scheme: s }), + }; + Ok(store) +} + +/// Drop-in replacement for [object_store::parse_url_opts] with caching and env vars. +/// +/// It will create or retrieve object store based on passed `url` and `options`. +/// Keeps global cache +pub async fn parse_url_opts( + url: &Url, + options: I, +) -> crate::Result<(Arc, Path)> +where + I: IntoIterator, + K: AsRef, + V: Into, +{ + let (scheme, path) = ObjectStoreScheme::parse(url).map_err(object_store::Error::from)?; + + let base_url = url + .as_ref() + .strip_suffix(path.as_ref()) + .unwrap_or_default() + .try_into()?; + + let object_store_id = ObjectStoreIdentifier::new(base_url, options); + let options = object_store_id.get_options(); + + { + let cache = OBJECT_STORE_CACHE.read().await; + if let Some(store) = (*cache).get(&object_store_id) { + return Ok((store.clone(), path)); + } + } + let store = create_object_store(scheme, url, options)?; + { + let mut cache = OBJECT_STORE_CACHE.write().await; + + if cache.len() >= CACHE_SIZE { + (*cache).clear() + } + _ = (*cache).insert(object_store_id, store.clone()); + } + + Ok((store.clone(), path)) +} + +#[cfg(test)] +mod tests { + use url::Url; + + use super::*; + + #[tokio::test] + async fn file_different_path() { + let options: Vec<(String, String)> = Vec::new(); + + let url = Url::parse("file:///some/path").unwrap(); + let (store, path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("file:///other/path").unwrap(); + let (store2, _) = parse_url_opts(&url2, options.clone()).await.unwrap(); + + { + let cache = OBJECT_STORE_CACHE.read().await; + println!("{cache:#?}") + } + + assert!(Arc::ptr_eq(&store, &store2)); + assert!(std::ptr::addr_eq(Arc::as_ptr(&store), Arc::as_ptr(&store2))); + assert_eq!(path.as_ref(), "some/path"); + } + + #[tokio::test] + async fn file_different_options() { + let options: Vec<(String, String)> = Vec::new(); + + let url = Url::parse("file:///some/path").unwrap(); + let (store, _) = parse_url_opts(&url, options).await.unwrap(); + + let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let url2 = Url::parse("file:///some/path").unwrap(); + let (store2, _) = parse_url_opts(&url2, options2).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } + + #[cfg(feature = "object-store-aws")] + #[tokio::test] + async fn cache_works() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store1, path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("s3://bucket/item2").unwrap(); + let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap(); + + assert!(Arc::ptr_eq(&store1, &store2)); + assert_eq!(path.as_ref(), "item"); + } + + #[cfg(feature = "object-store-aws")] + #[tokio::test] + async fn different_options() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store, _path) = parse_url_opts(&url, options).await.unwrap(); + + let url2 = Url::parse("s3://bucket/item2").unwrap(); + let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let (store2, _path) = parse_url_opts(&url2, options2).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } + + #[cfg(feature = "object-store-aws")] + #[tokio::test] + async fn different_urls() { + let url = Url::parse("s3://bucket/item").unwrap(); + let options: Vec<(String, String)> = Vec::new(); + + let (store, _path) = parse_url_opts(&url, options.clone()).await.unwrap(); + + let url2 = Url::parse("s3://other-bucket/item").unwrap(); + // let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))]; + let (store2, _path) = parse_url_opts(&url2, options).await.unwrap(); + + assert!(!Arc::ptr_eq(&store, &store2)); + } +} diff --git a/crates/object-store-cache/src/error.rs b/crates/object-store-cache/src/error.rs new file mode 100644 index 00000000..e4a90ef9 --- /dev/null +++ b/crates/object-store-cache/src/error.rs @@ -0,0 +1,25 @@ +use object_store::ObjectStoreScheme; +use thiserror::Error; + +/// Error enum for crate-specific errors. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum Error { + #[error("Failed to create object_store for {scheme:?}. Check if required feature is enabled.")] + ObjectStoreCreate { + /// feature + scheme: ObjectStoreScheme, + }, + + /// [url::ParseError] + #[error(transparent)] + UrlParse(#[from] url::ParseError), + + /// [object_store::Error] + #[error(transparent)] + ObjectStore(#[from] object_store::Error), + + /// [object_store::path::Error] + #[error(transparent)] + ObjectStorePath(#[from] object_store::path::Error), +} diff --git a/crates/object-store-cache/src/lib.rs b/crates/object-store-cache/src/lib.rs new file mode 100644 index 00000000..ebe3c379 --- /dev/null +++ b/crates/object-store-cache/src/lib.rs @@ -0,0 +1,16 @@ +//! Work with [ObjectStore](object_store::ObjectStore) in STAC. +//! +//! Features: +//! - cache used objects_stores based on url and options +//! - read cloud creadentials from env +//! + +mod cache; +mod error; + +pub use cache::parse_url_opts; + +pub use error::Error; + +/// Custom [Result](std::result::Result) type for this crate. +pub type Result = std::result::Result;