Skip to content

Commit

Permalink
feat(core, cli): move i/o to core
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed Sep 13, 2024
1 parent 9b0307b commit 4d497f8
Show file tree
Hide file tree
Showing 19 changed files with 706 additions and 438 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,12 @@ jobs:
run: cargo check -F geoarrow
- name: Check w/ geoparquet
run: cargo check -F geoparquet
- name: Check w/ geoparquet-compression
run: cargo check -F geoparquet-compression
- name: Check w/ object_store
run: cargo check -F object_store
- name: Check w/ object-store
run: cargo check -F object-store
- name: Check w/ reqwest
run: cargo check -F reqwest
- name: Test
run: cargo test -F geo -F geoarrow -F geoparquet -F geoparquet-compression -F reqwest -F object_store
run: cargo test -F geo -F geoparquet-compression -F reqwest -F object-store-full
test-core-with-gdal:
runs-on: ubuntu-latest
steps:
Expand Down
21 changes: 13 additions & 8 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,34 @@ rust-version = "1.75"
default = ["gdal", "geoparquet", "pgstac"]
duckdb = ["dep:stac-duckdb", "dep:duckdb"]
gdal = ["stac/gdal"]
geoparquet = ["dep:geoarrow", "stac/geoparquet-compression", "parquet"]
geoparquet = ["stac/geoparquet-compression"]
pgstac = ["stac-server/pgstac"]
python = ["dep:pyo3", "pgstac", "geoparquet"]

[dependencies]
axum = "0.7"
bytes = { version = "1" }
clap = { version = "4", features = ["derive"] }
duckdb = { version = "1", optional = true } # We have this dependency only to allow us to bundle it
geoarrow = { version = "0.3", optional = true }
object_store = { version = "0.11", features = ["aws", "gcp", "azure", "http"] }
parquet = { version = "52", optional = true }
duckdb = { version = "1", optional = true } # We have this dependency only to allow us to bundle it
object_store = "0.11"
pyo3 = { version = "0.22", optional = true }
reqwest = "0.12"
serde = "1"
serde_json = "1"
stac = { version = "0.9.0", path = "../core", features = ["reqwest"] }
stac = { version = "0.9.0", path = "../core", features = [
"reqwest",
"object-store-full",
] }
stac-api = { version = "0.5.0", path = "../api", features = ["client"] }
stac-duckdb = { version = "0.0.1", path = "../duckdb", optional = true }
stac-server = { version = "0.2.0", path = "../server", features = ["axum"] }
stac-validate = { version = "0.2.2", path = "../validate" }
thiserror = "1"
tokio = { version = "1.23", features = ["macros", "io-std", "rt-multi-thread"] }
tokio = { version = "1.23", features = [
"macros",
"io-std",
"rt-multi-thread",
"fs",
] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion cli/src/args/items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Run for Args {
let mut join_set = JoinSet::new();
let mut items = Vec::with_capacity(self.hrefs.len());
for href in self.hrefs {
let input = input.with_href(&href)?;
let input = input.with_href(href.clone());
let sender = stream.clone();
let args = ItemArgs {
id_or_href: href,
Expand Down
45 changes: 28 additions & 17 deletions cli/src/args/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ mod serve;
mod translate;
mod validate;

use crate::{config::Entry, input::Input, output::Output, Result, Value};
use crate::{input::Input, options::KeyValue, output::Output, Result, Value};
use clap::Parser;
use stac::Format;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tracing::info;
use tracing::metadata::Level;

const BUFFER: usize = 100;
Expand All @@ -25,19 +25,23 @@ const BUFFER: usize = 100;
pub struct Args {
/// The input format, if not provided will be inferred from the input file's extension, falling back to json
#[arg(short, long, global = true)]
input_format: Option<stac::Format>,
input_format: Option<Format>,

/// key=value pairs to use for the input object store
#[arg(short = 'k', long)]
input_config: Vec<Entry>,
#[arg(long = "input-option")]
input_options: Vec<KeyValue>,

/// The output format, if not provided will be inferred from the output file's extension, falling back to json
#[arg(short, long, global = true)]
output_format: Option<crate::output::Format>,
output_format: Option<Format>,

/// key=value pairs to use for the output object store
#[arg(short = 'c', long)]
output_config: Vec<Entry>,
#[arg(long = "output-option")]
output_options: Vec<KeyValue>,

/// key=value pairs to use for both the input and the output object store
#[arg(short = 'c', long = "option")]
options: Vec<KeyValue>,

/// If the output is a local file, create its parent directories before creating the file
#[arg(long, default_value_t = true)]
Expand Down Expand Up @@ -129,23 +133,30 @@ impl Args {
let input = Input::new(
self.subcommand.take_infile(),
self.input_format,
self.input_config,
)?;
self.options
.clone()
.into_iter()
.chain(self.input_options)
.collect::<Vec<_>>(),
);
let mut output = Output::new(
self.subcommand.take_outfile(),
self.output_format.or({
if self.stream {
Some(crate::output::Format::NdJson)
Some(Format::NdJson)
} else {
None
}
}),
self.output_config,
self.options
.into_iter()
.chain(self.output_options)
.collect::<Vec<_>>(),
self.create_parent_directories,
)
.await?;
let value = if self.stream {
if output.format != crate::output::Format::NdJson {
if output.format != Format::NdJson {
tracing::warn!(
"format was set to {}, but stream=true so re-setting to nd-json",
output.format
Expand All @@ -166,10 +177,10 @@ impl Args {
};
if let Some(value) = value {
if let Some(put_result) = output.put(value).await? {
info!(
"put result: etag={}, version={}",
put_result.e_tag.as_deref().unwrap_or("<none>"),
put_result.version.as_deref().unwrap_or("<none>")
tracing::info!(
"put result: etag={} version={}",
put_result.e_tag.unwrap_or_default(),
put_result.version.unwrap_or_default()
);
}
}
Expand Down
10 changes: 3 additions & 7 deletions cli/src/args/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,8 @@ impl Run for Args {
reading_from_stdin = true;
}
}
let input = input.with_href(&href)?;
let _ = join_set.spawn(async move {
let mut value = input.get().await?;
value.set_href(href);
Ok(value)
});
let input = input.with_href(href);
let _ = join_set.spawn(async move { input.get().await });
}
let mut item_join_set = JoinSet::new();
let mut collections = HashSet::new();
Expand All @@ -102,7 +98,7 @@ impl Run for Args {
collection.make_relative_links_absolute(href)?;
for link in collection.iter_item_links() {
let href = link.href.to_string();
let input = input.with_href(&href)?;
let input = input.with_href(href);
let _ = item_join_set.spawn(async move { input.get().await });
}
}
Expand Down
5 changes: 0 additions & 5 deletions cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ pub enum Error {
#[error(transparent)]
ObjectStorePath(#[from] object_store::path::Error),

/// [parquet::errors::ParquetError]
#[error(transparent)]
#[cfg(feature = "geoparquet")]
Parquet(#[from] parquet::errors::ParquetError),

/// [reqwest::Error]
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
Expand Down
117 changes: 20 additions & 97 deletions cli/src/input.rs
Original file line number Diff line number Diff line change
@@ -1,120 +1,43 @@
use crate::{config::Config, Error, Result};
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use stac::{Format, Item, ItemCollection, Value};
use std::io::BufReader;
use url::Url;
use crate::{options::Options, Error, Result};
use stac::{io::Config, Format, Value};

/// The input to a CLI run.
#[derive(Debug, Default)]
pub(crate) struct Input {
format: Format,
reader: Reader,
config: Config,
}

#[derive(Debug, Default)]
enum Reader {
ObjectStore {
object_store: Box<dyn ObjectStore>,
path: Path,
},
#[default]
Stdin,
href: Option<String>,
}

impl Input {
/// Creates a new input.
pub(crate) fn new(
infile: impl Into<Option<String>>,
href: impl Into<Option<String>>,
format: impl Into<Option<Format>>,
config: impl Into<Config>,
) -> Result<Input> {
let infile = infile
options: impl Into<Options>,
) -> Input {
let href = href
.into()
.and_then(|infile| if infile == "-" { None } else { Some(infile) });
let format = format
.into()
.or_else(|| infile.as_deref().and_then(Format::infer_from_href))
.unwrap_or_default();
let config = config.into();
let reader = if let Some(infile) = infile {
let (object_store, path) = parse_href_opts(&infile, config.iter())?;
Reader::ObjectStore { object_store, path }
} else {
Reader::Stdin
};
Ok(Input {
format,
reader,
config,
})
.and_then(|href| if href == "-" { None } else { Some(href) });
let config = Config::new().format(format).options(options.into());
Input { config, href }
}

/// Creates a new input with the given href.
pub(crate) fn with_href(&self, href: &str) -> Result<Input> {
let (object_store, path) = parse_href_opts(href, self.config.iter())?;
let reader = Reader::ObjectStore { object_store, path };
Ok(Input {
format: self.format,
reader,
pub(crate) fn with_href(&self, href: impl Into<Option<String>>) -> Input {
Input {
config: self.config.clone(),
})
href: href.into(),
}
}

/// Gets a STAC value from the input.
pub(crate) async fn get(&self) -> Result<Value> {
tracing::debug!("getting {}", self.format);
match &self.reader {
Reader::ObjectStore { object_store, path } => {
let bytes = object_store.get(path).await?.bytes().await?;
match self.format {
Format::Json => serde_json::from_slice(&bytes).map_err(Error::from),
Format::NdJson => bytes
.split(|c| *c == b'\n')
.map(|line| serde_json::from_slice::<Item>(line).map_err(Error::from))
.collect::<Result<Vec<_>>>()
.map(ItemCollection::from)
.map(Value::from),
#[cfg(feature = "geoparquet")]
Format::Geoparquet => stac::geoparquet::from_reader(bytes)
.map(Value::from)
.map_err(Error::from),
}
}
Reader::Stdin => match self.format {
Format::Json => serde_json::from_reader(std::io::stdin()).map_err(Error::from),
Format::NdJson => stac::ndjson::from_buf_reader(BufReader::new(std::io::stdin()))
.map(Value::from)
.map_err(Error::from),
#[cfg(feature = "geoparquet")]
Format::Geoparquet => {
use std::io::Read;

let mut buf = Vec::new();
let _ = std::io::stdin().read_to_end(&mut buf)?;
stac::geoparquet::from_reader(bytes::Bytes::from(buf))
.map(Value::from)
.map_err(Error::from)
}
},
if let Some(href) = self.href.as_ref() {
self.config.get(href.clone()).await.map_err(Error::from)
} else {
self.config
.from_reader(std::io::stdin())
.map_err(Error::from)
}
}
}

pub(crate) fn parse_href_opts<I, K, V>(
href: &str,
options: I,
) -> Result<(Box<dyn ObjectStore>, Path)>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
if let Ok(url) = Url::parse(href) {
object_store::parse_url_opts(&url, options).map_err(Error::from)
} else {
let path = Path::from_filesystem_path(href)?;
let object_store = LocalFileSystem::new();
Ok((Box::new(object_store), path))
}
}
4 changes: 2 additions & 2 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! ```
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![warn(
#![deny(
elided_lifetimes_in_paths,
explicit_outlives_requirements,
keyword_idents,
Expand Down Expand Up @@ -44,9 +44,9 @@
)]

mod args;
mod config;
mod error;
mod input;
mod options;
mod output;
#[cfg(feature = "python")]
mod python;
Expand Down
Loading

0 comments on commit 4d497f8

Please sign in to comment.