diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index a368ca27a..74f47bd1b 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -39,14 +39,12 @@ jobs: run: cargo check -F geoarrow - name: Check w/ geoparquet run: cargo check -F geoparquet - - name: Check w/ geoparquet-compression - run: cargo check -F geoparquet-compression - - name: Check w/ object_store - run: cargo check -F object_store + - name: Check w/ object-store + run: cargo check -F object-store - name: Check w/ reqwest run: cargo check -F reqwest - name: Test - run: cargo test -F geo -F geoarrow -F geoparquet -F geoparquet-compression -F reqwest -F object_store + run: cargo test -F geo -F geoparquet-compression -F reqwest -F object-store-full test-core-with-gdal: runs-on: ubuntu-latest steps: diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 48f0a8123..1dc8cf2b7 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -15,29 +15,34 @@ rust-version = "1.75" default = ["gdal", "geoparquet", "pgstac"] duckdb = ["dep:stac-duckdb", "dep:duckdb"] gdal = ["stac/gdal"] -geoparquet = ["dep:geoarrow", "stac/geoparquet-compression", "parquet"] +geoparquet = ["stac/geoparquet-compression"] pgstac = ["stac-server/pgstac"] python = ["dep:pyo3", "pgstac", "geoparquet"] [dependencies] axum = "0.7" -bytes = { version = "1" } clap = { version = "4", features = ["derive"] } -duckdb = { version = "1", optional = true } # We have this dependency only to allow us to bundle it -geoarrow = { version = "0.3", optional = true } -object_store = { version = "0.11", features = ["aws", "gcp", "azure", "http"] } -parquet = { version = "52", optional = true } +duckdb = { version = "1", optional = true } # We have this dependency only to allow us to bundle it +object_store = "0.11" pyo3 = { version = "0.22", optional = true } reqwest = "0.12" serde = "1" serde_json = "1" -stac = { version = "0.9.0", path = "../core", features = ["reqwest"] } +stac = { version = "0.9.0", path = "../core", features = [ + "reqwest", + "object-store-full", +] } stac-api = { version = "0.5.0", path = "../api", features = ["client"] } stac-duckdb = { version = "0.0.1", path = "../duckdb", optional = true } stac-server = { version = "0.2.0", path = "../server", features = ["axum"] } stac-validate = { version = "0.2.2", path = "../validate" } thiserror = "1" -tokio = { version = "1.23", features = ["macros", "io-std", "rt-multi-thread"] } +tokio = { version = "1.23", features = [ + "macros", + "io-std", + "rt-multi-thread", + "fs", +] } tokio-stream = "0.1" tracing = "0.1" tracing-subscriber = "0.3" diff --git a/cli/src/args/items.rs b/cli/src/args/items.rs index 086cfec57..1e12661dc 100644 --- a/cli/src/args/items.rs +++ b/cli/src/args/items.rs @@ -35,7 +35,7 @@ impl Run for Args { let mut join_set = JoinSet::new(); let mut items = Vec::with_capacity(self.hrefs.len()); for href in self.hrefs { - let input = input.with_href(&href)?; + let input = input.with_href(href.clone()); let sender = stream.clone(); let args = ItemArgs { id_or_href: href, diff --git a/cli/src/args/mod.rs b/cli/src/args/mod.rs index 242beecc0..50f6b597b 100644 --- a/cli/src/args/mod.rs +++ b/cli/src/args/mod.rs @@ -10,11 +10,11 @@ mod serve; mod translate; mod validate; -use crate::{config::Entry, input::Input, output::Output, Result, Value}; +use crate::{input::Input, options::KeyValue, output::Output, Result, Value}; use clap::Parser; +use stac::Format; use tokio::sync::mpsc::Sender; use tokio::task::JoinHandle; -use tracing::info; use tracing::metadata::Level; const BUFFER: usize = 100; @@ -25,19 +25,23 @@ const BUFFER: usize = 100; pub struct Args { /// The input format, if not provided will be inferred from the input file's extension, falling back to json #[arg(short, long, global = true)] - input_format: Option, + input_format: Option, /// key=value pairs to use for the input object store - #[arg(short = 'k', long)] - input_config: Vec, + #[arg(long = "input-option")] + input_options: Vec, /// The output format, if not provided will be inferred from the output file's extension, falling back to json #[arg(short, long, global = true)] - output_format: Option, + output_format: Option, /// key=value pairs to use for the output object store - #[arg(short = 'c', long)] - output_config: Vec, + #[arg(long = "output-option")] + output_options: Vec, + + /// key=value pairs to use for both the input and the output object store + #[arg(short = 'c', long = "option")] + options: Vec, /// If the output is a local file, create its parent directories before creating the file #[arg(long, default_value_t = true)] @@ -129,23 +133,30 @@ impl Args { let input = Input::new( self.subcommand.take_infile(), self.input_format, - self.input_config, - )?; + self.options + .clone() + .into_iter() + .chain(self.input_options) + .collect::>(), + ); let mut output = Output::new( self.subcommand.take_outfile(), self.output_format.or({ if self.stream { - Some(crate::output::Format::NdJson) + Some(Format::NdJson) } else { None } }), - self.output_config, + self.options + .into_iter() + .chain(self.output_options) + .collect::>(), self.create_parent_directories, ) .await?; let value = if self.stream { - if output.format != crate::output::Format::NdJson { + if output.format != Format::NdJson { tracing::warn!( "format was set to {}, but stream=true so re-setting to nd-json", output.format @@ -166,10 +177,10 @@ impl Args { }; if let Some(value) = value { if let Some(put_result) = output.put(value).await? { - info!( - "put result: etag={}, version={}", - put_result.e_tag.as_deref().unwrap_or(""), - put_result.version.as_deref().unwrap_or("") + tracing::info!( + "put result: etag={} version={}", + put_result.e_tag.unwrap_or_default(), + put_result.version.unwrap_or_default() ); } } diff --git a/cli/src/args/serve.rs b/cli/src/args/serve.rs index 8e1ac91e2..50c78813b 100644 --- a/cli/src/args/serve.rs +++ b/cli/src/args/serve.rs @@ -78,12 +78,8 @@ impl Run for Args { reading_from_stdin = true; } } - let input = input.with_href(&href)?; - let _ = join_set.spawn(async move { - let mut value = input.get().await?; - value.set_href(href); - Ok(value) - }); + let input = input.with_href(href); + let _ = join_set.spawn(async move { input.get().await }); } let mut item_join_set = JoinSet::new(); let mut collections = HashSet::new(); @@ -102,7 +98,7 @@ impl Run for Args { collection.make_relative_links_absolute(href)?; for link in collection.iter_item_links() { let href = link.href.to_string(); - let input = input.with_href(&href)?; + let input = input.with_href(href); let _ = item_join_set.spawn(async move { input.get().await }); } } diff --git a/cli/src/error.rs b/cli/src/error.rs index df2bbc660..879dcf4f5 100644 --- a/cli/src/error.rs +++ b/cli/src/error.rs @@ -17,11 +17,6 @@ pub enum Error { #[error(transparent)] ObjectStorePath(#[from] object_store::path::Error), - /// [parquet::errors::ParquetError] - #[error(transparent)] - #[cfg(feature = "geoparquet")] - Parquet(#[from] parquet::errors::ParquetError), - /// [reqwest::Error] #[error(transparent)] Reqwest(#[from] reqwest::Error), diff --git a/cli/src/input.rs b/cli/src/input.rs index be639c838..06528f6d3 100644 --- a/cli/src/input.rs +++ b/cli/src/input.rs @@ -1,120 +1,43 @@ -use crate::{config::Config, Error, Result}; -use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; -use stac::{Format, Item, ItemCollection, Value}; -use std::io::BufReader; -use url::Url; +use crate::{options::Options, Error, Result}; +use stac::{io::Config, Format, Value}; /// The input to a CLI run. #[derive(Debug, Default)] pub(crate) struct Input { - format: Format, - reader: Reader, config: Config, -} - -#[derive(Debug, Default)] -enum Reader { - ObjectStore { - object_store: Box, - path: Path, - }, - #[default] - Stdin, + href: Option, } impl Input { /// Creates a new input. pub(crate) fn new( - infile: impl Into>, + href: impl Into>, format: impl Into>, - config: impl Into, - ) -> Result { - let infile = infile + options: impl Into, + ) -> Input { + let href = href .into() - .and_then(|infile| if infile == "-" { None } else { Some(infile) }); - let format = format - .into() - .or_else(|| infile.as_deref().and_then(Format::infer_from_href)) - .unwrap_or_default(); - let config = config.into(); - let reader = if let Some(infile) = infile { - let (object_store, path) = parse_href_opts(&infile, config.iter())?; - Reader::ObjectStore { object_store, path } - } else { - Reader::Stdin - }; - Ok(Input { - format, - reader, - config, - }) + .and_then(|href| if href == "-" { None } else { Some(href) }); + let config = Config::new().format(format).options(options.into()); + Input { config, href } } /// Creates a new input with the given href. - pub(crate) fn with_href(&self, href: &str) -> Result { - let (object_store, path) = parse_href_opts(href, self.config.iter())?; - let reader = Reader::ObjectStore { object_store, path }; - Ok(Input { - format: self.format, - reader, + pub(crate) fn with_href(&self, href: impl Into>) -> Input { + Input { config: self.config.clone(), - }) + href: href.into(), + } } /// Gets a STAC value from the input. pub(crate) async fn get(&self) -> Result { - tracing::debug!("getting {}", self.format); - match &self.reader { - Reader::ObjectStore { object_store, path } => { - let bytes = object_store.get(path).await?.bytes().await?; - match self.format { - Format::Json => serde_json::from_slice(&bytes).map_err(Error::from), - Format::NdJson => bytes - .split(|c| *c == b'\n') - .map(|line| serde_json::from_slice::(line).map_err(Error::from)) - .collect::>>() - .map(ItemCollection::from) - .map(Value::from), - #[cfg(feature = "geoparquet")] - Format::Geoparquet => stac::geoparquet::from_reader(bytes) - .map(Value::from) - .map_err(Error::from), - } - } - Reader::Stdin => match self.format { - Format::Json => serde_json::from_reader(std::io::stdin()).map_err(Error::from), - Format::NdJson => stac::ndjson::from_buf_reader(BufReader::new(std::io::stdin())) - .map(Value::from) - .map_err(Error::from), - #[cfg(feature = "geoparquet")] - Format::Geoparquet => { - use std::io::Read; - - let mut buf = Vec::new(); - let _ = std::io::stdin().read_to_end(&mut buf)?; - stac::geoparquet::from_reader(bytes::Bytes::from(buf)) - .map(Value::from) - .map_err(Error::from) - } - }, + if let Some(href) = self.href.as_ref() { + self.config.get(href.clone()).await.map_err(Error::from) + } else { + self.config + .from_reader(std::io::stdin()) + .map_err(Error::from) } } } - -pub(crate) fn parse_href_opts( - href: &str, - options: I, -) -> Result<(Box, Path)> -where - I: IntoIterator, - K: AsRef, - V: Into, -{ - if let Ok(url) = Url::parse(href) { - object_store::parse_url_opts(&url, options).map_err(Error::from) - } else { - let path = Path::from_filesystem_path(href)?; - let object_store = LocalFileSystem::new(); - Ok((Box::new(object_store), path)) - } -} diff --git a/cli/src/lib.rs b/cli/src/lib.rs index c4a5dc8b7..6f009d856 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -13,7 +13,7 @@ //! ``` #![cfg_attr(docsrs, feature(doc_auto_cfg))] -#![warn( +#![deny( elided_lifetimes_in_paths, explicit_outlives_requirements, keyword_idents, @@ -44,9 +44,9 @@ )] mod args; -mod config; mod error; mod input; +mod options; mod output; #[cfg(feature = "python")] mod python; diff --git a/cli/src/config.rs b/cli/src/options.rs similarity index 55% rename from cli/src/config.rs rename to cli/src/options.rs index aefb809a4..86cc52866 100644 --- a/cli/src/config.rs +++ b/cli/src/options.rs @@ -2,34 +2,25 @@ use std::{convert::Infallible, str::FromStr}; /// A collection of configuration entries. #[derive(Clone, Debug, Default)] -pub(crate) struct Config(Vec); +pub(crate) struct Options(Vec); /// `key=value`` pairs for object store configuration #[derive(Clone, Debug)] -pub(crate) struct Entry { +pub(crate) struct KeyValue { key: String, value: String, } -impl Config { - /// Returns an iterator over this config's key value pairs. - pub(crate) fn iter(&self) -> impl Iterator { - self.0 - .iter() - .map(|entry| (entry.key.as_str(), entry.value.as_str())) - } -} - -impl FromStr for Entry { +impl FromStr for KeyValue { type Err = Infallible; fn from_str(s: &str) -> Result { if let Some((key, value)) = s.split_once('=') { - Ok(Entry { + Ok(KeyValue { key: key.to_string(), value: value.to_string(), }) } else { - Ok(Entry { + Ok(KeyValue { key: s.to_string(), value: String::new(), }) @@ -37,8 +28,14 @@ impl FromStr for Entry { } } -impl From> for Config { - fn from(value: Vec) -> Self { - Config(value) +impl From> for Options { + fn from(value: Vec) -> Self { + Options(value) + } +} + +impl From for Vec<(String, String)> { + fn from(value: Options) -> Self { + value.0.into_iter().map(|kv| (kv.key, kv.value)).collect() } } diff --git a/cli/src/output.rs b/cli/src/output.rs index f275b4aef..e9e855ae7 100644 --- a/cli/src/output.rs +++ b/cli/src/output.rs @@ -1,8 +1,9 @@ //! Structures for writing output data. -use crate::{config::Config, value::Value, Error, Result}; -use object_store::{buffered::BufWriter, path::Path, ObjectStore, PutPayload, PutResult}; -use std::{fmt::Display, io::IsTerminal, pin::Pin, str::FromStr, sync::Arc}; +use crate::{options::Options, value::Value, Error, Result}; +use object_store::{local::LocalFileSystem, ObjectStore, PutResult}; +use stac::{io::Config, Format}; +use std::{io::IsTerminal, path::Path, pin::Pin}; use tokio::{ fs::File, io::{AsyncWrite, AsyncWriteExt}, @@ -10,173 +11,98 @@ use tokio::{ use url::Url; /// The output from a CLI run. -#[allow(missing_debug_implementations)] pub(crate) struct Output { - /// The output format. - pub format: Format, - - writer: Writer, -} - -/// The output format. -#[derive(Clone, Debug, PartialEq, Copy)] -pub(crate) enum Format { - /// Pretty-printed JSON, good for terminals. - PrettyJson, - /// Compact JSON, good for files. - CompactJson, - /// Newline-delimited JSON - NdJson, - /// stac-geoparquet - #[cfg(feature = "geoparquet")] - Geoparquet(parquet::basic::Compression), -} - -struct Writer { + pub(crate) format: Format, + href: Option, stream: Pin>, - object_store: Option<(Arc>, Path)>, + config: Config, } impl Output { /// Creates a new output from an optional outfile and an optional format. pub(crate) async fn new( - outfile: Option, + href: Option, format: Option, - config: impl Into, + options: impl Into, create_parent_directories: bool, ) -> Result { - let format = format - .or_else(|| outfile.as_deref().and_then(Format::infer_from_href)) - .unwrap_or_else(|| { - if outfile.is_some() && !std::io::stdout().is_terminal() { - Format::CompactJson - } else { - Format::PrettyJson - } - }); - let writer = if let Some(outfile) = outfile { - if let Some(url) = Url::parse(&outfile).ok().and_then(|url| { + let mut format = format + .or_else(|| href.as_deref().and_then(Format::infer_from_href)) + .unwrap_or_default(); + let config = Config::new().format(Some(format)).options(options.into()); + let stream = if let Some(href) = href.as_deref() { + if let Ok(url) = Url::parse(href) { if url.scheme() == "file" { - None + create_file_stream( + url.to_file_path().unwrap_or_default(), + create_parent_directories, + ) + .await? } else { - Some(url) - } - }) { - let (object_store, path) = - object_store::parse_url_opts(&url, config.into().iter())?; - let object_store = Arc::new(object_store); - let stream = BufWriter::new(object_store.clone(), path.clone()); - Writer { - stream: Box::pin(stream), - object_store: Some((object_store, path)), + Box::pin(config.buf_writer(&url)?) } } else { - if create_parent_directories { - if let Some(parent) = std::path::Path::new(&outfile).parent() { - std::fs::create_dir_all(parent)?; - } - } - Writer { - stream: Box::pin(File::create(outfile).await?), - object_store: None, - } + create_file_stream(href, create_parent_directories).await? } } else { - Writer { - stream: Box::pin(tokio::io::stdout()), - object_store: None, + if std::io::stdout().is_terminal() { + format = format.pretty(); } + Box::pin(tokio::io::stdout()) }; - Ok(Output { format, writer }) + Ok(Output { + href, + format, + stream, + config, + }) } /// Streams a value to the output pub(crate) async fn stream(&mut self, value: Value) -> Result<()> { - let bytes = value.into_bytes(Format::NdJson)?; - self.writer.stream.write_all(&bytes).await?; - self.writer.stream.flush().await?; + let bytes = value.into_ndjson()?; + self.stream.write_all(&bytes).await?; + self.stream.flush().await?; Ok(()) } /// Puts a value to the output. pub(crate) async fn put(&mut self, value: Value) -> Result> { - let bytes = value.into_bytes(self.format)?; - if let Some((object_store, path)) = &self.writer.object_store { + let bytes = match value { + Value::Json(value) => self.format.json_to_vec(value)?, + Value::Stac(value) => self.format.value_to_vec(value)?, + }; + if let Some(href) = self.href.as_deref() { + let (object_store, path): (Box, _) = match Url::parse(href) { + Ok(url) => self.config.object_store(&url)?, + Err(_) => { + let path = object_store::path::Path::from_filesystem_path(href)?; + (Box::new(LocalFileSystem::new()), path) + } + }; object_store - .put(path, PutPayload::from_bytes(bytes)) + .put(&path, bytes.into()) .await .map(Some) .map_err(Error::from) } else { - let output = self.writer.stream.write_all(&bytes).await.map(|_| None)?; - self.writer.stream.flush().await?; - Ok(output) - } - } -} - -impl FromStr for Format { - type Err = Error; - fn from_str(s: &str) -> Result { - let s = s.to_ascii_lowercase(); - match s.as_str() { - "json" | "geojson" => Ok(Self::CompactJson), - "ndjson" => Ok(Self::NdJson), - _ => { - #[cfg(feature = "geoparquet")] - if s.starts_with("parquet") || s.starts_with("geoparquet") { - if let Some((_, compression)) = s.split_once('[') { - if let Some(stop) = compression.find(']') { - Ok(Self::Geoparquet(compression[..stop].parse()?)) - } else { - Err(Error::UnsupportedFormat(s.to_string())) - } - } else { - Ok(Self::Geoparquet(parquet::basic::Compression::UNCOMPRESSED)) - } - } else { - Err(Error::UnsupportedFormat(s.to_string())) - } - #[cfg(not(feature = "geoparquet"))] - Err(Error::UnsupportedFormat(s.to_string())) - } + self.stream.write_all(&bytes).await?; + self.stream.flush().await?; + Ok(None) } } } -impl Display for Format { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Format::PrettyJson => f.write_str("pretty-json"), - Format::CompactJson => f.write_str("compact-json"), - Format::NdJson => f.write_str("nd-json"), - #[cfg(feature = "geoparquet")] - Format::Geoparquet(compression) => { - if *compression == parquet::basic::Compression::UNCOMPRESSED { - f.write_str("geoparquet") - } else { - write!(f, "geoparquet[{}]", compression) - } - } +async fn create_file_stream( + path: impl AsRef, + create_parent_directories: bool, +) -> Result>> { + let path = path.as_ref(); + if create_parent_directories { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; } } -} - -impl Format { - fn infer_from_href(href: &str) -> Option { - href.rsplit_once('.').and_then(|(_, ext)| ext.parse().ok()) - } -} - -#[cfg(test)] -mod tests { - #[test] - #[cfg(feature = "geoparquet")] - fn geoparquet_compression() { - let format: super::Format = "geoparquet[snappy]".parse().unwrap(); - assert_eq!( - format, - super::Format::Geoparquet(parquet::basic::Compression::SNAPPY) - ); - } + let file = File::create(path).await?; + Ok(Box::pin(file)) } diff --git a/cli/src/value.rs b/cli/src/value.rs index 55dbd5dd6..ba108d7d7 100644 --- a/cli/src/value.rs +++ b/cli/src/value.rs @@ -1,6 +1,6 @@ -use crate::{output::Format, Error, Result}; -use bytes::Bytes; +use crate::{Error, Result}; use serde::Serialize; +use stac::Format; /// An output value, which can either be a [serde_json::Value] or a [stac::Value]. #[derive(Debug, Serialize)] @@ -14,79 +14,14 @@ pub enum Value { } impl Value { - pub(crate) fn into_bytes(self, format: Format) -> Result { + pub(crate) fn into_ndjson(self) -> Result> { match self { - Self::Stac(value) => match format { - Format::CompactJson => serde_json::to_vec(&value) - .map(Bytes::from) - .map_err(Error::from), - Format::PrettyJson => serde_json::to_vec_pretty(&value) - .map(Bytes::from) - .map_err(Error::from), - Format::NdJson => { - if let stac::Value::ItemCollection(item_collection) = value { - let mut buf = Vec::new(); - for item in item_collection.items { - serde_json::to_writer(&mut buf, &item)?; - buf.push(b'\n'); - } - Ok(buf.into()) - } else { - serde_json::to_vec(&value) - .map(Bytes::from) - .map_err(Error::from) - } - } - #[cfg(feature = "geoparquet")] - Format::Geoparquet(compression) => geoparquet_bytes(value, compression), - }, - Self::Json(value) => match format { - Format::CompactJson => serde_json::to_vec(&value) - .map(Bytes::from) - .map_err(Error::from), - Format::PrettyJson => serde_json::to_vec_pretty(&value) - .map(Bytes::from) - .map_err(Error::from), - Format::NdJson => { - if let serde_json::Value::Array(array) = value { - let mut buf = Vec::new(); - for value in array { - serde_json::to_writer(&mut buf, &value)?; - buf.push(b'\n'); - } - Ok(buf.into()) - } else { - serde_json::to_vec(&value) - .map(Bytes::from) - .map_err(Error::from) - } - } - #[cfg(feature = "geoparquet")] - Format::Geoparquet(compression) => { - geoparquet_bytes(serde_json::from_value(value)?, compression) - } - }, + Value::Json(value) => Format::NdJson.json_to_vec(value).map_err(Error::from), + Value::Stac(value) => Format::NdJson.value_to_vec(value).map_err(Error::from), } } } -#[cfg(feature = "geoparquet")] -fn geoparquet_bytes(value: stac::Value, compression: parquet::basic::Compression) -> Result { - tracing::debug!( - "converting STAC {} to geoparquet bytes using {} compression", - value.type_name(), - compression - ); - let mut options = geoarrow::io::parquet::GeoParquetWriterOptions::default(); - let writer_properties = parquet::file::properties::WriterProperties::builder() - .set_compression(compression) - .build(); - options.writer_properties = Some(writer_properties); - let mut bytes = Vec::new(); - stac::geoparquet::to_writer_with_options(&mut bytes, value, &options)?; - Ok(bytes.into()) -} - impl From for Value { fn from(value: stac::Value) -> Self { Value::Stac(value) diff --git a/core/Cargo.toml b/core/Cargo.toml index fe1cc63fa..09367f2e4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,7 +21,7 @@ geoarrow = [ "dep:arrow-schema", "dep:geo-types", ] -geoparquet = ["geoarrow/parquet", "dep:parquet"] +geoparquet = ["geoarrow", "geoarrow/parquet", "dep:parquet", "dep:bytes"] geoparquet-compression = [ "geoparquet", "geoarrow/parquet_compression", @@ -31,7 +31,17 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object_store = ["dep:object_store"] +object-store = ["dep:object_store", "dep:send-future"] +object-store-aws = ["object-store", "object_store/aws"] +object-store-azure = ["object-store", "object_store/azure"] +object-store-gcp = ["object-store", "object_store/gcp"] +object-store-http = ["object-store", "object_store/http"] +object-store-full = [ + "object-store-aws", + "object-store-azure", + "object-store-gcp", + "object-store-http", +] reqwest = ["dep:reqwest"] [dependencies] @@ -39,6 +49,7 @@ arrow-array = { version = "52", optional = true } arrow-cast = { version = "52", optional = true } arrow-json = { version = "52", optional = true } arrow-schema = { version = "52", optional = true } +bytes = { version = "1", optional = true } chrono = { version = "0.4", features = ["serde"] } gdal = { version = "0.17", optional = true } gdal-sys = { version = "0.10", optional = true } @@ -51,6 +62,7 @@ mime = "0.3" object_store = { version = "0.11", optional = true } parquet = { version = "52", default-features = false, optional = true } reqwest = { version = "0.12", optional = true, features = ["json", "blocking"] } +send-future = { version = "0.1", optional = true } # https://github.com/rust-lang/rust/issues/96865 serde = { version = "1", features = ["derive"] } serde_json = { version = "1", features = ["preserve_order"] } thiserror = "1" diff --git a/core/README.md b/core/README.md index 65519aa2c..33c3535d2 100644 --- a/core/README.md +++ b/core/README.md @@ -113,12 +113,12 @@ It's support in **stac-rs** is currently experimental: } ``` -### object_store +### object-store -`object_store` adds traits to read and write from an [object_store](https://docs.rs/object_store/latest/object_store/): +`object-store` adds traits to read and write from an [object_store](https://docs.rs/object_store/latest/object_store/): ```rust -#[cfg(feature = "object_store")] +#[cfg(feature = "object-store")] { tokio_test::block_on(async { use object_store::{path::Path, local::LocalFileSystem}; diff --git a/core/src/error.rs b/core/src/error.rs index fdac7c6ba..271436548 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -101,9 +101,19 @@ pub enum Error { /// [object_store::Error] #[error(transparent)] - #[cfg(feature = "object_store")] + #[cfg(feature = "object-store")] ObjectStore(#[from] object_store::Error), + /// [object_store::path::Error] + #[error(transparent)] + #[cfg(feature = "object-store")] + ObjectStorePath(#[from] object_store::path::Error), + + /// [parquet::errors::ParquetError] + #[error(transparent)] + #[cfg(feature = "geoparquet")] + Parquet(#[from] parquet::errors::ParquetError), + /// Returned when trying to read from a url but the `reqwest` feature is not enabled. #[error("reqwest is not enabled")] ReqwestNotEnabled, diff --git a/core/src/format.rs b/core/src/format.rs index 9d86ddee0..9e3104db5 100644 --- a/core/src/format.rs +++ b/core/src/format.rs @@ -1,19 +1,22 @@ -use crate::{Error, Result}; +use crate::{Error, Result, Value}; +#[cfg(feature = "geoparquet")] +use parquet::basic::Compression; use std::{fmt::Display, str::FromStr}; /// The format of STAC data. -#[derive(Debug, Clone, Copy, Default, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum Format { /// JSON data (the default). - #[default] - Json, + /// + /// The boolean indicates whether it should be pretty-printed (true for pretty). + Json(bool), /// Newline-delimited JSON. NdJson, /// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet) #[cfg(feature = "geoparquet")] - Geoparquet, + Geoparquet(Option), } impl Format { @@ -24,21 +27,144 @@ impl Format { /// ``` /// use stac::Format; /// - /// assert_eq!(Format::Json, Format::infer_from_href("item.json").unwrap()); + /// assert_eq!(Format::Json(false), Format::infer_from_href("item.json").unwrap()); /// ``` pub fn infer_from_href(href: &str) -> Option { href.rsplit_once('.').and_then(|(_, ext)| ext.parse().ok()) } + + /// Sets this format to the pretty version, if possible. + /// + /// # Examples + /// + /// ``` + /// use stac::Format; + /// + /// assert_eq!(Format::Json(false).pretty(), Format::Json(true)); + /// ``` + pub fn pretty(self) -> Format { + if let Format::Json(_) = self { + Format::Json(true) + } else { + self + } + } + + /// Converts a [Value] to a vector of bytes. + /// + /// # Examples + /// + /// ``` + /// use stac::{Format, Item}; + /// + /// let bytes = Format::Json(true).value_to_vec(Item::new("an-id")).unwrap(); + /// ``` + pub fn value_to_vec(&self, value: impl Into) -> Result> { + let value = value.into(); + match self { + Format::Json(pretty) => { + if *pretty { + serde_json::to_vec_pretty(&value).map_err(Error::from) + } else { + serde_json::to_vec(&value).map_err(Error::from) + } + } + Format::NdJson => { + if let Value::ItemCollection(item_collection) = value { + let mut buf = Vec::new(); + for item in &item_collection.items { + serde_json::to_writer(&mut buf, item)?; + buf.push(b'\n'); + } + Ok(buf) + } else { + serde_json::to_vec(&value).map_err(Error::from) + } + } + #[cfg(feature = "geoparquet")] + Format::Geoparquet(compression) => { + let mut options = geoarrow::io::parquet::GeoParquetWriterOptions::default(); + if let Some(compression) = compression { + let writer_properties = parquet::file::properties::WriterProperties::builder() + .set_compression(*compression) + .build(); + options.writer_properties = Some(writer_properties); + } + let mut bytes = Vec::new(); + crate::geoparquet::to_writer_with_options(&mut bytes, value, &options)?; + Ok(bytes) + } + } + } + + /// Converts a [serde_json::Value] to a vector of bytes. + /// + /// # Examples + /// + /// ``` + /// use stac::Format; + /// + /// let data = serde_json::json!({ + /// "foo": "bar" + /// }); + /// let bytes = Format::Json(true).json_to_vec(data).unwrap(); + /// ``` + pub fn json_to_vec(&self, value: serde_json::Value) -> Result> { + match self { + Format::Json(pretty) => { + if *pretty { + serde_json::to_vec_pretty(&value).map_err(Error::from) + } else { + serde_json::to_vec(&value).map_err(Error::from) + } + } + Format::NdJson => { + if let serde_json::Value::Array(array) = value { + let mut buf = Vec::new(); + for value in &array { + serde_json::to_writer(&mut buf, value)?; + buf.push(b'\n'); + } + Ok(buf) + } else { + serde_json::to_vec(&value).map_err(Error::from) + } + } + #[cfg(feature = "geoparquet")] + Format::Geoparquet(_) => { + let item_collection = crate::ItemCollection::try_from(value)?; + self.value_to_vec(item_collection) + } + } + } +} + +impl Default for Format { + fn default() -> Self { + Self::Json(false) + } } impl Display for Format { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - Self::Json => "json", - Self::NdJson => "ndjson", + match self { + Self::Json(pretty) => { + if *pretty { + f.write_str("json-pretty") + } else { + f.write_str("json") + } + } + Self::NdJson => f.write_str("ndjson"), #[cfg(feature = "geoparquet")] - Self::Geoparquet => "geoparquet", - }) + Self::Geoparquet(compression) => { + if let Some(compression) = *compression { + write!(f, "geoparquet[{}]", compression) + } else { + f.write_str("geoparquet") + } + } + } } } @@ -47,11 +173,30 @@ impl FromStr for Format { fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { - "json" | "geojson" => Ok(Self::Json), + "json" | "geojson" => Ok(Self::Json(false)), + "json-pretty" | "geojson-pretty" => Ok(Self::Json(true)), "ndjson" => Ok(Self::NdJson), - #[cfg(feature = "geoparquet")] - "geoparquet" | "parquet" => Ok(Self::Geoparquet), - _ => Err(Error::UnsupportedFormat(s.to_string())), + _ => { + if s.starts_with("parquet") || s.starts_with("geoparquet") { + #[cfg(feature = "geoparquet")] + if let Some((_, compression)) = s.split_once('[') { + if let Some(stop) = compression.find(']') { + Ok(Self::Geoparquet(Some(compression[..stop].parse()?))) + } else { + Err(Error::UnsupportedFormat(s.to_string())) + } + } else { + Ok(Self::Geoparquet(None)) + } + #[cfg(not(feature = "geoparquet"))] + { + log::warn!("{} has a geoparquet extension, but the geoparquet feature is not enabled", s); + Err(Error::UnsupportedFormat(s.to_string())) + } + } else { + Err(Error::UnsupportedFormat(s.to_string())) + } + } } } } @@ -63,7 +208,17 @@ mod tests { fn parse_geoparquet() { assert_eq!( "parquet".parse::().unwrap(), - super::Format::Geoparquet + super::Format::Geoparquet(None) + ); + } + + #[test] + #[cfg(feature = "geoparquet")] + fn parse_geoparquet_compression() { + let format: super::Format = "geoparquet[snappy]".parse().unwrap(); + assert_eq!( + format, + super::Format::Geoparquet(Some(parquet::basic::Compression::SNAPPY)) ); } } diff --git a/core/src/io.rs b/core/src/io.rs index 09b0f315d..e6450beca 100644 --- a/core/src/io.rs +++ b/core/src/io.rs @@ -1,13 +1,21 @@ //! Input and output (IO) functions. -use crate::{Href, Result}; +use crate::{Error, Format, Href, Result}; #[cfg(feature = "reqwest")] use reqwest::blocking::Response; use serde::de::DeserializeOwned; use std::{fs::File, path::Path}; use url::Url; +#[cfg(feature = "object-store")] +use { + crate::object_store::Get, + object_store::{ + buffered::BufWriter, local::LocalFileSystem, path::Path as ObjectStorePath, ObjectStore, + }, + std::sync::Arc, +}; -/// Reads any STAC object from an href. +/// Reads any STAC value from an href. /// /// If the `geoparquet` feature is enabled, and the href's extension is /// `geoparquet` or `parquet`, the data will be read as @@ -23,20 +31,283 @@ use url::Url; /// let item: stac::Item = stac::read("examples/simple-item.json").unwrap(); /// ``` pub fn read(href: impl ToString) -> Result { - let href = href.to_string(); - if crate::geoparquet::has_extension(&href) { - #[cfg(feature = "geoparquet")] + Config::new().read(href) +} + +/// Gets any STAC value from an href. +/// +/// This is an asynchronous function that uses +/// [object_store](https://docs.rs/object_store/latest/object_store/) via the +/// `object-store` feature. The `object_store` feature only includes local +/// filesystem storage, so to read from http endpoints or a cloud store, you +/// need to enable the corresponding feature (`object-store-http`, +/// `object-store-aws`, `object-store-gcp`, or `object-store-azure`). +/// +/// For more control over access, e.g. to pass options to the underlying +/// **object_store**, use [Config]. +/// +/// # Examples +/// +/// ``` +/// # tokio_test::block_on(async { +/// #[cfg(feature = "object-store")] +/// { +/// let item: stac::Item = stac::get("examples/simple-item.json").await.unwrap(); +/// } +/// # }) +/// ``` +#[cfg(feature = "object-store")] +pub async fn get(href: impl ToString) -> Result { + Config::new().get(href.to_string()).await +} + +/// Configuration for reading and writing STAC values. +#[derive(Debug, Default, Clone)] +pub struct Config { + format: Option, + options: Vec, +} + +/// A key and a value. +pub type KeyValue = (String, String); + +impl Config { + /// Creates a new reader. + /// + /// # Examples + /// + /// ``` + /// use stac::io::Config; + /// + /// let config = Config::new(); + /// ``` + pub fn new() -> Config { + Config { + format: None, + options: Vec::new(), + } + } + + /// Sets the format for this Config. + /// + /// If the format is not set, it will be inferred from the href's extension, + /// falling back to JSON. + /// + /// # Examples + /// + /// ``` + /// use stac::{io::Config, Format}; + /// + /// let config = Config::new().format(Format::NdJson); + /// ``` + pub fn format(mut self, format: impl Into>) -> Config { + self.format = format.into(); + self + } + + /// Sets the options for this config. + /// + /// The options are used to configure the object store, if one is used. + /// + /// # Examples + /// + /// ``` + /// use stac::io::Config; + /// + /// let config = Config::new().options(vec![("foo".to_string(), "bar".to_string())]); + /// ``` + pub fn options(mut self, options: impl Into>) -> Config { + self.options = options.into(); + self + } + + /// Reads a STAC value from an href. + /// + /// This is a synchronous operation that can only read from the local + /// filesystem or, if the `reqwest` feature is enabled, from `http` and + /// `https` hrefs. If you need support for cloud storage (e.g. aws, azure, + /// or gcp), use the asynchronous [Config::get]. + /// + /// # Examples + /// + /// ``` + /// use stac::{io::Config, Item}; + /// + /// let config = Config::new(); + /// let item: Item = config.read("examples/simple-item.json").unwrap(); + /// ``` + pub fn read(&self, href: impl ToString) -> Result + where + T: DeserializeOwned + Href, + { + let href = href.to_string(); + match self + .format + .unwrap_or_else(|| Format::infer_from_href(&href).unwrap_or_default()) { - serde_json::from_value(serde_json::to_value(crate::geoparquet::read(href)?)?) - .map_err(crate::Error::from) + Format::Json(_) => crate::json::read(href), + Format::NdJson => { + serde_json::from_value(serde_json::to_value(crate::ndjson::read(href)?)?) + .map_err(Error::from) + } + #[cfg(feature = "geoparquet")] + Format::Geoparquet(_) => { + serde_json::from_value(serde_json::to_value(crate::geoparquet::read(href)?)?) + .map_err(Error::from) + } + } + } + + /// Reads a STAC value from a [Read]. + /// + /// # Examples + /// + /// ``` + /// use stac::{io::Config, Item}; + /// use std::fs::File; + /// + /// let config = Config::new(); + /// let file = File::open("examples/simple-item.json").unwrap(); + /// let item: Item = config.from_reader(file).unwrap(); + /// ``` + #[cfg_attr(not(feature = "geoparquet"), allow(unused_mut))] + pub fn from_reader(&self, mut read: impl std::io::Read) -> Result + where + T: DeserializeOwned + Href, + { + match self.format.unwrap_or_default() { + Format::Json(_) => serde_json::from_reader(read).map_err(Error::from), + Format::NdJson => serde_json::from_value(serde_json::to_value( + crate::ndjson::from_buf_reader(std::io::BufReader::new(read))?, + )?) + .map_err(Error::from), + #[cfg(feature = "geoparquet")] + Format::Geoparquet(_) => { + let mut buf = Vec::new(); + let _ = read.read_to_end(&mut buf)?; + serde_json::from_value(serde_json::to_value(crate::geoparquet::from_reader( + bytes::Bytes::from(buf), + )?)?) + .map_err(Error::from) + } } - #[cfg(not(feature = "geoparquet"))] + } + + /// Gets a STAC value from an href using **object_store**. + /// + /// # Examples + /// + /// ``` + /// use stac::{io::Config, Item}; + /// + /// let config = Config::new(); + /// # tokio_test::block_on(async { + /// let item: Item = config.get("examples/simple-item.json").await.unwrap(); + /// # }) + /// ``` + #[cfg(feature = "object-store")] + pub async fn get(&self, href: impl ToString) -> Result { + use send_future::SendFuture as _; + + // TODO make a `get_opts` to allow use to pass `GetOptions` in + let href = href.to_string(); + let (object_store, path) = self.parse_href(&href)?; + let mut value: T = match self + .format + .unwrap_or_else(|| Format::infer_from_href(&href).unwrap_or_default()) + { + Format::Json(_) => object_store.get_json(&path).send().await?, + Format::NdJson => serde_json::from_value(serde_json::to_value( + object_store.get_ndjson(&path).send().await?, + )?)?, + #[cfg(feature = "geoparquet")] + Format::Geoparquet(_) => serde_json::from_value(serde_json::to_value( + object_store.get_geoparquet(&path).send().await?, + )?)?, + }; + value.set_href(href); + Ok(value) + } + + /// Gets an [ItemCollection] from an href using **object_store**. + /// + /// Use this method when you know you're getting an item collection, e.g. if + /// you're reading + /// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet). + /// + /// # Examples + /// + /// ``` + /// use stac::io::Config; + /// + /// let config = Config::new(); + /// # tokio_test::block_on(async { + /// #[cfg(feature = "geoparquet")] { + /// let item_collection = config.get_item_collection("data/extended-item.parquet").await.unwrap(); + /// } + /// # }) + /// ``` + #[cfg(feature = "object-store")] + pub async fn get_item_collection(&self, href: impl ToString) -> Result { + let href = href.to_string(); + let (object_store, path) = self.parse_href(&href)?; + let mut item_collection = match self + .format + .unwrap_or_else(|| Format::infer_from_href(&href).unwrap_or_default()) { - log::warn!("{} has a geoparquet extension, but this crate was not built with the `geoparquet` feature. Reading as JSON.", href); - crate::json::read(href) + Format::Json(_) => object_store.get_json(&path).await?, + Format::NdJson => object_store.get_ndjson(&path).await?, + #[cfg(feature = "geoparquet")] + Format::Geoparquet(_) => object_store.get_geoparquet(&path).await?, + }; + item_collection.set_href(href); + Ok(item_collection) + } + + /// Creates an [object_store::buffered::BufWriter] for the provided url. + /// + /// # Examples + /// + /// ```no_run + /// use stac::io::Config; + /// + /// let config = Config::new(); + /// let buf_writer = config.buf_writer(&"s3://stac/item.json".parse().unwrap()).unwrap(); + /// ``` + #[cfg(feature = "object-store")] + pub fn buf_writer(&self, url: &Url) -> Result { + let (object_store, path) = object_store::parse_url_opts(url, self.iter_options())?; + Ok(BufWriter::new(Arc::new(object_store), path)) + } + + /// Creates an [object_store::ObjectStore] and path for the provided url. + /// + /// # Examples + /// + /// ```no_run + /// use stac::io::Config; + /// + /// let config = Config::new(); + /// let (store, path) = config.object_store(&"s3://stac/item.json".parse().unwrap()).unwrap(); + /// ``` + #[cfg(feature = "object-store")] + pub fn object_store(&self, url: &Url) -> Result<(Box, ObjectStorePath)> { + object_store::parse_url_opts(url, self.iter_options()).map_err(Error::from) + } + + #[cfg(feature = "object-store")] + fn parse_href(&self, href: &str) -> Result<(Box, ObjectStorePath)> { + if let Ok(url) = Url::parse(href) { + object_store::parse_url_opts(&url, self.iter_options()).map_err(Error::from) + } else { + let path = ObjectStorePath::from_filesystem_path(href)?; + Ok((Box::new(LocalFileSystem::new()), path)) } - } else { - crate::json::read(href) + } + + #[cfg(feature = "object-store")] + fn iter_options(&self) -> impl Iterator { + self.options.iter().map(|(k, v)| (k.as_str(), v.as_str())) } } @@ -70,7 +341,7 @@ pub(crate) trait Read { #[cfg(not(feature = "reqwest"))] fn read_from_url(_: Url) -> Result { - Err(crate::Error::ReqwestNotEnabled) + Err(Error::ReqwestNotEnabled) } } @@ -79,8 +350,9 @@ mod tests { use crate::{Catalog, Collection, Item, ItemCollection}; macro_rules! read { - ($function:ident, $filename:expr, $value:ty) => { + ($function:ident, $filename:expr, $value:ty $(, $meta:meta)?) => { #[test] + $(#[$meta])? fn $function() { use crate::Href; @@ -146,4 +418,13 @@ mod tests { fn read_geoparquet() { let _ = super::read::("data/extended-item.parquet").unwrap_err(); } + + #[tokio::test] + #[cfg(feature = "object-store")] + async fn get() { + use crate::Href; + + let item: Item = super::get("examples/simple-item.json").await.unwrap(); + assert!(item.href().unwrap().ends_with("examples/simple-item.json")); + } } diff --git a/core/src/item_collection.rs b/core/src/item_collection.rs index 5ff39fd69..18170fd7a 100644 --- a/core/src/item_collection.rs +++ b/core/src/item_collection.rs @@ -1,4 +1,4 @@ -use crate::{Href, Item, Link, Links, Migrate}; +use crate::{Error, Href, Item, Link, Links, Migrate}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::{ops::Deref, vec::IntoIter}; @@ -119,6 +119,27 @@ impl Migrate for ItemCollection { } } +impl TryFrom for ItemCollection { + type Error = Error; + + fn try_from(value: Value) -> Result { + match serde_json::from_value::(value.clone()) { + Ok(item_collection) => Ok(item_collection), + Err(err) => { + if let Value::Array(array) = value { + let mut items = Vec::new(); + for item in array { + items.push(serde_json::from_value(item)?); + } + Ok(items.into()) + } else { + Err(Error::from(err)) + } + } + } + } +} + #[cfg(test)] mod tests { use super::ItemCollection; diff --git a/core/src/lib.rs b/core/src/lib.rs index 7fba3bec5..e5a2d5c6a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -73,7 +73,7 @@ //! to get and put STAC values to and from an [object_store](https://docs.rs/object_store/latest/object_store/): //! //! ``` -//! #[cfg(feature = "object_store")] +//! #[cfg(feature = "object-store")] //! { //! use object_store::{path::Path, local::LocalFileSystem}; //! use stac::{Item, object_store::Get}; @@ -160,12 +160,14 @@ pub mod link; mod migrate; pub mod mime; pub mod ndjson; -#[cfg(feature = "object_store")] +#[cfg(feature = "object-store")] pub mod object_store; mod statistics; mod value; mod version; +#[cfg(feature = "object-store")] +pub use io::get; pub use { asset::{Asset, Assets}, band::Band, @@ -246,7 +248,7 @@ mod tests { #[cfg(not(feature = "geoparquet"))] use bytes as _; use rstest as _; - #[cfg(not(feature = "object_store"))] + #[cfg(not(feature = "object-store"))] use tokio as _; use tokio_test as _; diff --git a/core/src/object_store.rs b/core/src/object_store.rs index 6cf19545d..0af0e5eb9 100644 --- a/core/src/object_store.rs +++ b/core/src/object_store.rs @@ -1,14 +1,18 @@ //! Use [object_store](https://docs.rs/object_store/latest/object_store/) to read and write STAC. +#![allow(async_fn_in_trait)] + use crate::{Error, Href, Item, ItemCollection, Result}; #[cfg(feature = "geoparquet")] use geoarrow::io::parquet::GeoParquetWriterOptions; use object_store::{path::Path, GetOptions, ObjectStore, PutOptions, PutResult}; use serde::{de::DeserializeOwned, Serialize}; -use std::future::Future; /// Get STAC from an object store. -pub trait Get { +/// +/// These traits are not intended to be implemented outside of this crate, hence +/// we allow the `async_fn_in_trait` nit. +pub trait Get: ObjectStore { /// Gets STAC from JSON in an object store. /// /// # Examples @@ -23,19 +27,16 @@ pub trait Get { /// let item: Item = store.get_json(&location).await.unwrap(); /// # }) /// ``` - fn get_json( - &self, - location: &Path, - ) -> impl Future> { - self.get_json_opts(location, GetOptions::default()) + async fn get_json(&self, location: &Path) -> Result { + self.get_json_opts(location, GetOptions::default()).await } /// Gets STAC from JSON in an object store with options. - fn get_json_opts( + async fn get_json_opts( &self, location: &Path, options: GetOptions, - ) -> impl Future>; + ) -> Result; /// Gets an [ItemCollection] from newline-delimited JSON in an object store. /// @@ -51,16 +52,13 @@ pub trait Get { /// let items = store.get_ndjson(&location).await.unwrap(); /// # }) /// ``` - fn get_ndjson(&self, location: &Path) -> impl Future> { - self.get_ndjson_opts(location, GetOptions::default()) + async fn get_ndjson(&self, location: &Path) -> Result { + self.get_ndjson_opts(location, GetOptions::default()).await } /// Gets an [ItemCollection] from newline-delimited JSON in an object store with options. - fn get_ndjson_opts( - &self, - location: &Path, - options: GetOptions, - ) -> impl Future>; + async fn get_ndjson_opts(&self, location: &Path, options: GetOptions) + -> Result; /// Gets an [ItemCollection] from geoparquet in an object store. /// @@ -77,21 +75,25 @@ pub trait Get { /// # }) /// ``` #[cfg(feature = "geoparquet")] - fn get_geoparquet(&self, location: &Path) -> impl Future> { + async fn get_geoparquet(&self, location: &Path) -> Result { self.get_geoparquet_opts(location, GetOptions::default()) + .await } /// Gets an [ItemCollection] from geoparquet in an object store with options. #[cfg(feature = "geoparquet")] - fn get_geoparquet_opts( + async fn get_geoparquet_opts( &self, location: &Path, options: GetOptions, - ) -> impl Future>; + ) -> Result; } /// Puts STAC to an object store. -pub trait Put { +/// +/// These traits are not intended to be implemented outside of this crate, hence +/// we allow the `async_fn_in_trait` nit. +pub trait Put: ObjectStore { /// Puts STAC to JSON in an object store. /// /// # Examples @@ -107,21 +109,18 @@ pub trait Put { /// let _ = store.put_json(&location, &item).await.unwrap(); /// # }) /// ``` - fn put_json( - &self, - location: &Path, - value: &T, - ) -> impl Future> { + async fn put_json(&self, location: &Path, value: &T) -> Result { self.put_json_opts(location, value, PutOptions::default()) + .await } /// Puts STAC to JSON in an object store. - fn put_json_opts( + async fn put_json_opts( &self, location: &Path, value: &T, options: PutOptions, - ) -> impl Future>; + ) -> Result; /// Puts an [ItemCollection] as newline-delimited JSON in an object store. /// @@ -138,21 +137,22 @@ pub trait Put { /// let _ = store.put_ndjson(&location, &vec![item].into()).await.unwrap(); /// # }) /// ``` - fn put_ndjson( + async fn put_ndjson( &self, location: &Path, item_collection: &ItemCollection, - ) -> impl Future> { + ) -> Result { self.put_ndjson_opts(location, item_collection, PutOptions::default()) + .await } /// Puts an [ItemCollection] as newline-delimited JSON in an object store. - fn put_ndjson_opts( + async fn put_ndjson_opts( &self, location: &Path, item_collection: &ItemCollection, options: PutOptions, - ) -> impl Future>; + ) -> Result; /// Puts an [ItemCollection] as geoparquet in an object store. /// @@ -170,28 +170,29 @@ pub trait Put { /// # }) /// ``` #[cfg(feature = "geoparquet")] - fn put_geoparquet( + async fn put_geoparquet( &self, location: &Path, item_collection: ItemCollection, - ) -> impl Future> { + ) -> Result { self.put_geoparquet_opts( location, item_collection, GeoParquetWriterOptions::default(), PutOptions::default(), ) + .await } /// Puts an [ItemCollection] as geoparquet in an object store with options. #[cfg(feature = "geoparquet")] - fn put_geoparquet_opts( + async fn put_geoparquet_opts( &self, location: &Path, item_collection: ItemCollection, geoparquet_writer_options: GeoParquetWriterOptions, put_options: PutOptions, - ) -> impl Future>; + ) -> Result; } impl Get for O {