diff --git a/Cargo.toml b/Cargo.toml index 803eabd345e4..dfd00ef6358d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,5 @@ features = [ "compute_filter", "compute_if_then_else", ] - [patch.crates-io] # packed_simd_2 = { git = "https://github.com/rust-lang/packed_simd", rev = "e57c7ba11386147e6d2cbad7c88f376aab4bdc86" } diff --git a/examples/read_parquet_cloud/Cargo.toml b/examples/read_parquet_cloud/Cargo.toml index cd70308319a9..4ebee5b0e64e 100644 --- a/examples/read_parquet_cloud/Cargo.toml +++ b/examples/read_parquet_cloud/Cargo.toml @@ -7,4 +7,4 @@ edition = "2021" [dependencies] aws-creds = "0.34.0" -polars = { path = "../../polars", features = ["lazy", "parquet-async"] } +polars = { path = "../../polars", features = ["lazy", "aws"] } diff --git a/examples/read_parquet_cloud/src/main.rs b/examples/read_parquet_cloud/src/main.rs index 0214bc154ce4..e179266e1de3 100644 --- a/examples/read_parquet_cloud/src/main.rs +++ b/examples/read_parquet_cloud/src/main.rs @@ -1,6 +1,5 @@ -use std::env; - use awscreds::Credentials; +use cloud::AmazonS3ConfigKey as Key; use polars::prelude::*; // Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket. @@ -9,11 +8,16 @@ const TEST_S3: &str = "s3://lov2test/polars/datasets/*.parquet"; fn main() -> PolarsResult<()> { let cred = Credentials::default().unwrap(); - env::set_var("AWS_SECRET_ACCESS_KEY", cred.secret_key.unwrap()); - env::set_var("AWS_DEFAULT_REGION", "us-west-2"); - env::set_var("AWS_ACCESS_KEY_ID", cred.access_key.unwrap()); - let df = LazyFrame::scan_parquet(TEST_S3, ScanArgsParquet::default())? + // Propagate the credentials and other cloud options. + let mut args = ScanArgsParquet::default(); + let cloud_options = cloud::CloudOptions::default().with_aws([ + (Key::AccessKeyId, &cred.access_key.unwrap()), + (Key::SecretAccessKey, &cred.secret_key.unwrap()), + (Key::Region, &"us-west-2".into()), + ]); + args.cloud_options = Some(cloud_options); + let df = LazyFrame::scan_parquet(TEST_S3, args)? .with_streaming(true) .select([ // select all columns diff --git a/polars/Cargo.toml b/polars/Cargo.toml index 6ace1b9ca7ed..bfb88372a664 100644 --- a/polars/Cargo.toml +++ b/polars/Cargo.toml @@ -31,7 +31,10 @@ ndarray = ["polars-core/ndarray"] serde = ["polars-core/serde"] serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde", "polars-time/serde", "polars-io/serde", "polars-ops/serde"] parquet = ["polars-io", "polars-core/parquet", "polars-lazy/parquet", "polars-io/parquet"] -parquet-async = ["parquet", "polars-io", "polars-core/parquet", "polars-lazy/parquet-async", "polars-io/parquet"] +async = ["polars-lazy/async"] +aws = ["async", "polars-io/aws"] +azure = ["async", "polars-io/azure"] +gcp = ["async", "polars-io/gcp"] lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"] # commented out until UB is fixed # parallel = ["polars-core/parallel"] diff --git a/polars/polars-core/Cargo.toml b/polars/polars-core/Cargo.toml index cdffbab33be4..7a8bc38fd4a0 100644 --- a/polars/polars-core/Cargo.toml +++ b/polars/polars-core/Cargo.toml @@ -147,6 +147,12 @@ docs-selection = [ "partition_by", ] +# Cloud support. +"async" = ["url"] +"aws" = ["async", "object_store/aws"] +"azure" = ["async", "object_store/azure"] +"gcp" = ["async", "object_store/gcp"] + [dependencies] ahash.workspace = true anyhow.workspace = true @@ -161,6 +167,7 @@ hex = { version = "0.4", optional = true } indexmap = { version = "1", features = ["std"] } ndarray = { version = "0.15", optional = true, default_features = false } num.workspace = true +object_store = { version = "0.5.3", default-features = false, optional = true } once_cell = "1" polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] } polars-utils = { version = "0.26.1", path = "../polars-utils" } @@ -173,6 +180,7 @@ serde = { version = "1", features = ["derive"], optional = true } serde_json = { version = "1", optional = true } smartstring = { version = "1" } thiserror.workspace = true +url = { version = "2.3.1", optional = true } xxhash-rust.workspace = true [target.'cfg(target_family = "wasm")'.dependencies] diff --git a/polars/polars-core/src/cloud.rs b/polars/polars-core/src/cloud.rs new file mode 100644 index 000000000000..bd9471823853 --- /dev/null +++ b/polars/polars-core/src/cloud.rs @@ -0,0 +1,234 @@ +use std::str::FromStr; + +#[cfg(feature = "aws")] +use object_store::aws::AmazonS3Builder; +#[cfg(feature = "aws")] +pub use object_store::aws::AmazonS3ConfigKey; +#[cfg(feature = "azure")] +pub use object_store::azure::AzureConfigKey; +#[cfg(feature = "azure")] +use object_store::azure::MicrosoftAzureBuilder; +#[cfg(feature = "gcp")] +use object_store::gcp::GoogleCloudStorageBuilder; +#[cfg(feature = "gcp")] +pub use object_store::gcp::GoogleConfigKey; +#[cfg(feature = "async")] +use object_store::ObjectStore; +#[cfg(feature = "serde-lazy")] +use serde::{Deserialize, Serialize}; +#[cfg(feature = "async")] +use url::Url; + +use crate::error::{PolarsError, PolarsResult}; + +/// The type of the config keys must satisfy the following requirements: +/// 1. must be easily collected into a HashMap, the type required by the object_crate API. +/// 2. be Serializable, required when the serde-lazy feature is defined. +/// 3. not actually use HashMap since that type is disallowed in Polars for performance reasons. +/// +/// Currently this type is a vector of pairs config key - config value. +#[allow(dead_code)] +type Configs = Vec<(T, String)>; + +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))] +/// Options to conect to various cloud providers. +pub struct CloudOptions { + #[cfg(feature = "aws")] + aws: Option>, + #[cfg(feature = "azure")] + azure: Option>, + #[cfg(feature = "gcp")] + gcp: Option>, +} + +#[allow(dead_code)] +/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type. +fn parsed_untyped_config, impl Into)>>( + config: I, +) -> PolarsResult> +where + T: FromStr + std::cmp::Eq + std::hash::Hash, +{ + config + .into_iter() + .map(|(key, val)| { + T::from_str(key.as_ref()) + .map_err(|_e| { + PolarsError::ComputeError( + format!("Unknown configuration key {}.", key.as_ref()).into(), + ) + }) + .map(|typed_key| (typed_key, val.into())) + }) + .collect::>>() +} + +pub enum CloudType { + Aws, + Azure, + File, + Gcp, +} + +impl FromStr for CloudType { + type Err = PolarsError; + + #[cfg(feature = "async")] + fn from_str(url: &str) -> Result { + let parsed = Url::parse(url).map_err(anyhow::Error::from)?; + match parsed.scheme() { + "s3" => Ok(Self::Aws), + "az" | "adl" | "abfs" => Ok(Self::Azure), + "gs" | "gcp" => Ok(Self::Gcp), + "file" => Ok(Self::File), + &_ => Err(PolarsError::ComputeError("Unknown url scheme.".into())), + } + } + + #[cfg(not(feature = "async"))] + fn from_str(_s: &str) -> Result { + Err(PolarsError::ComputeError( + "At least one of the cloud features must be enabled.".into(), + )) + } +} + +impl CloudOptions { + /// Set the configuration for AWS connections. This is the preferred API from rust. + #[cfg(feature = "aws")] + pub fn with_aws)>>( + mut self, + configs: I, + ) -> Self { + self.aws = Some( + configs + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect::>(), + ); + self + } + + /// Build the ObjectStore implementation for AWS. + #[cfg(feature = "aws")] + pub fn build_aws(&self, bucket_name: &str) -> PolarsResult { + let options = self.aws.as_ref().map(Ok).unwrap_or_else(|| { + Err(PolarsError::ComputeError( + "`aws` configuration missing.".into(), + )) + })?; + AmazonS3Builder::new() + .try_with_options(options.clone().into_iter()) + .and_then(|b| b.with_bucket_name(bucket_name).build()) + .map_err(|e| PolarsError::ComputeError(e.to_string().into())) + } + + /// Set the configuration for Azure connections. This is the preferred API from rust. + #[cfg(feature = "azure")] + pub fn with_azure)>>( + mut self, + configs: I, + ) -> Self { + self.azure = Some( + configs + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect::>(), + ); + self + } + + /// Build the ObjectStore implementation for Azure. + #[cfg(feature = "azure")] + pub fn build_azure(&self, container_name: &str) -> PolarsResult { + let options = self.azure.as_ref().map(Ok).unwrap_or_else(|| { + Err(PolarsError::ComputeError( + "`azure` configuration missing.".into(), + )) + })?; + MicrosoftAzureBuilder::new() + .try_with_options(options.clone().into_iter()) + .and_then(|b| b.with_container_name(container_name).build()) + .map_err(|e| PolarsError::ComputeError(e.to_string().into())) + } + + /// Set the configuration for GCP connections. This is the preferred API from rust. + #[cfg(feature = "gcp")] + pub fn with_gcp)>>( + mut self, + configs: I, + ) -> Self { + self.gcp = Some( + configs + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect::>(), + ); + self + } + + /// Build the ObjectStore implementation for GCP. + #[cfg(feature = "gcp")] + pub fn build_gcp(&self, bucket_name: &str) -> PolarsResult { + let options = self.gcp.as_ref().map(Ok).unwrap_or_else(|| { + Err(PolarsError::ComputeError( + "`gcp` configuration missing.".into(), + )) + })?; + GoogleCloudStorageBuilder::new() + .try_with_options(options.clone().into_iter()) + .and_then(|b| b.with_bucket_name(bucket_name).build()) + .map_err(|e| PolarsError::ComputeError(e.to_string().into())) + } + + /// Parse a configuration from a Hashmap. This is the interface from Python. + #[allow(unused_variables)] + pub fn from_untyped_config, impl Into)>>( + url: &str, + config: I, + ) -> PolarsResult { + match CloudType::from_str(url)? { + CloudType::Aws => { + #[cfg(feature = "aws")] + { + parsed_untyped_config::(config) + .map(|aws| Self::default().with_aws(aws)) + } + #[cfg(not(feature = "aws"))] + { + Err(PolarsError::ComputeError( + "Feature aws is not enabled.".into(), + )) + } + } + CloudType::Azure => { + #[cfg(feature = "azure")] + { + parsed_untyped_config::(config) + .map(|azure| Self::default().with_azure(azure)) + } + #[cfg(not(feature = "azure"))] + { + Err(PolarsError::ComputeError( + "Feature gcp is not enabled.".into(), + )) + } + } + CloudType::File => Ok(Self::default()), + CloudType::Gcp => { + #[cfg(feature = "gcp")] + { + parsed_untyped_config::(config) + .map(|gcp| Self::default().with_gcp(gcp)) + } + #[cfg(not(feature = "gcp"))] + { + Err(PolarsError::ComputeError( + "Feature gcp is not enabled.".into(), + )) + } + } + } + } +} diff --git a/polars/polars-core/src/datatypes/any_value.rs b/polars/polars-core/src/datatypes/any_value.rs index d11448a9b3a5..b03d8c2bb9e9 100644 --- a/polars/polars-core/src/datatypes/any_value.rs +++ b/polars/polars-core/src/datatypes/any_value.rs @@ -653,7 +653,7 @@ impl PartialOrd for AnyValue<'_> { (Float64(l), Float64(r)) => l.partial_cmp(r), (Utf8(l), Utf8(r)) => l.partial_cmp(*r), #[cfg(feature = "dtype-binary")] - (Binary(l), Binary(r)) => l.partial_cmp(r), + (Binary(l), Binary(r)) => l.partial_cmp(*r), _ => None, } } diff --git a/polars/polars-core/src/datatypes/mod.rs b/polars/polars-core/src/datatypes/mod.rs index de38477e44ed..ebf1bb923ef3 100644 --- a/polars/polars-core/src/datatypes/mod.rs +++ b/polars/polars-core/src/datatypes/mod.rs @@ -260,3 +260,6 @@ impl PolarsIntegerType for Int64Type {} pub trait PolarsFloatType: PolarsNumericType {} impl PolarsFloatType for Float32Type {} impl PolarsFloatType for Float64Type {} + +// Provide options to cloud providers (credentials, region). +pub type CloudOptions = PlHashMap; diff --git a/polars/polars-core/src/lib.rs b/polars/polars-core/src/lib.rs index 238df3fb4b07..9d7f47333569 100644 --- a/polars/polars-core/src/lib.rs +++ b/polars/polars-core/src/lib.rs @@ -4,6 +4,7 @@ extern crate core; #[macro_use] pub mod utils; pub mod chunked_array; +pub mod cloud; pub(crate) mod config; pub mod datatypes; #[cfg(feature = "docs")] diff --git a/polars/polars-core/src/prelude.rs b/polars/polars-core/src/prelude.rs index 22dc4d85c449..f830a6095df8 100644 --- a/polars/polars-core/src/prelude.rs +++ b/polars/polars-core/src/prelude.rs @@ -51,4 +51,4 @@ pub use crate::testing::*; pub(crate) use crate::utils::CustomIterTools; pub use crate::utils::IntoVec; pub use crate::vector_hasher::{VecHash, VecHashSingle}; -pub use crate::{datatypes, df}; +pub use crate::{cloud, datatypes, df}; diff --git a/polars/polars-io/Cargo.toml b/polars/polars-io/Cargo.toml index 36770a9ef524..949775e24346 100644 --- a/polars/polars-io/Cargo.toml +++ b/polars/polars-io/Cargo.toml @@ -31,7 +31,10 @@ dtype-binary = ["polars-core/dtype-binary"] fmt = ["polars-core/fmt"] lazy = [] parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"] -parquet-async = ["parquet", "async-trait", "futures", "object_store", "tokio", "url"] +async = ["async-trait", "futures", "object_store", "tokio", "url"] +aws = ["object_store/aws", "async", "polars-core/aws"] +azure = ["object_store/azure", "async", "polars-core/azure"] +gcp = ["object_store/gcp", "async", "polars-core/gcp"] partition = ["polars-core/partition_by"] temporal = ["dtype-datetime", "dtype-date", "dtype-time"] # don't use this @@ -51,7 +54,7 @@ lexical-core = { version = "0.8", optional = true } memchr = "2.5" memmap = { package = "memmap2", version = "0.5.2", optional = true } num.workspace = true -object_store = { version = "0.5.2", features = ["aws"], optional = true } +object_store = { version = "0.5.3", default-features = false, optional = true } once_cell = "1" polars-arrow = { version = "0.26.1", path = "../polars-arrow" } polars-core = { version = "0.26.1", path = "../polars-core", features = ["private"], default-features = false } diff --git a/polars/polars-io/src/cloud/adaptors.rs b/polars/polars-io/src/cloud/adaptors.rs new file mode 100644 index 000000000000..2f7e2a949f7a --- /dev/null +++ b/polars/polars-io/src/cloud/adaptors.rs @@ -0,0 +1,131 @@ +//! Interface with the object_store crate and define AsyncSeek, AsyncRead. +//! This is used, for example, by the parquet2 crate. +use std::io::{self}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; + +use futures::executor::block_on; +use futures::future::BoxFuture; +use futures::lock::Mutex; +use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt}; +use object_store::path::Path; +use object_store::ObjectStore; + +type OptionalFuture = Arc>>>>>; + +/// Adaptor to translate from AsyncSeek and AsyncRead to the object_store get_range API. +pub struct CloudReader { + // The current position in the stream, it is set by seeking and updated by reading bytes. + pos: u64, + // The total size of the object is required when seeking from the end of the file. + length: Option, + // Hold an reference to the store in a thread safe way. + object_store: Arc>>, + // The path in the object_store of the current object being read. + path: Path, + // If a read is pending then `active` will point to its future. + active: OptionalFuture, +} + +impl CloudReader { + pub fn new( + length: Option, + object_store: Arc>>, + path: Path, + ) -> Self { + Self { + pos: 0, + length, + object_store, + path, + active: Arc::new(Mutex::new(None)), + } + } + + /// For each read request we create a new future. + async fn read_operation( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + length: usize, + ) -> std::task::Poll>> { + let start = self.pos as usize; + + // If we already have a future just poll it. + if let Some(fut) = self.active.lock().await.as_mut() { + return Future::poll(fut.as_mut(), cx); + } + + // Create the future. + let future = { + let path = self.path.clone(); + let arc = self.object_store.clone(); + // Use an async move block to get our owned objects. + async move { + let object_store = arc.lock().await; + object_store + .get_range(&path, start..start + length) + .map_ok(|r| r.to_vec()) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("object store error {e:?}"), + ) + }) + .await + } + }; + // Prepare for next read. + self.pos += length as u64; + + let mut future = Box::pin(future); + + // Need to poll it once to get the pump going. + let polled = Future::poll(future.as_mut(), cx); + + // Save for next time. + let mut state = self.active.lock().await; + *state = Some(future); + polled + } +} + +impl AsyncRead for CloudReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + // Use block_on in order to get the future result in this thread and copy the data in the output buffer. + // With this approach we keep ownership of the buffer and we don't have to pass it to the future runtime. + match block_on(self.read_operation(cx, buf.len())) { + Poll::Ready(Ok(bytes)) => { + buf.copy_from_slice(&bytes); + Poll::Ready(Ok(bytes.len())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsyncSeek for CloudReader { + fn poll_seek( + mut self: Pin<&mut Self>, + _: &mut std::task::Context<'_>, + pos: io::SeekFrom, + ) -> std::task::Poll> { + match pos { + io::SeekFrom::Start(pos) => self.pos = pos, + io::SeekFrom::End(pos) => { + let length = self.length.ok_or::(io::Error::new( + std::io::ErrorKind::Other, + "Cannot seek from end of stream when length is unknown.", + ))?; + self.pos = (length as i64 + pos) as u64 + } + io::SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64, + }; + std::task::Poll::Ready(Ok(self.pos)) + } +} diff --git a/polars/polars-io/src/object_store.rs b/polars/polars-io/src/cloud/glob.rs similarity index 62% rename from polars/polars-io/src/object_store.rs rename to polars/polars-io/src/cloud/glob.rs index 714d84bc2208..344600f35514 100644 --- a/polars/polars-io/src/object_store.rs +++ b/polars/polars-io/src/cloud/glob.rs @@ -1,142 +1,13 @@ -//! Interface with the object_store crate and define AsyncSeek, AsyncRead. -//! This is used, for example, by the parquet2 crate. - -use std::io::{self}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Poll; - -use futures::executor::block_on; -use futures::future::{ready, BoxFuture}; -use futures::lock::Mutex; -use futures::{AsyncRead, AsyncSeek, Future, StreamExt, TryFutureExt, TryStreamExt}; -use object_store::aws::AmazonS3Builder; -use object_store::local::LocalFileSystem; +use futures::future::ready; +use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; -use object_store::ObjectStore; +use polars_core::cloud::CloudOptions; use polars_core::prelude::{PolarsError, PolarsResult}; use regex::Regex; use url::Url; -type OptionalFuture = Arc>>>>>; const DELIMITER: char = '/'; -/// Adaptor to translate from AsyncSeek and AsyncRead to the object_store get_range API. -pub struct CloudReader { - // The current position in the stream, it is set by seeking and updated by reading bytes. - pos: u64, - // The total size of the object is required when seeking from the end of the file. - length: Option, - // Hold an reference to the store in a thread safe way. - object_store: Arc>>, - // The path in the object_store of the current object being read. - path: Path, - // If a read is pending then `active` will point to its future. - active: OptionalFuture, -} - -impl CloudReader { - pub fn new( - length: Option, - object_store: Arc>>, - path: Path, - ) -> Self { - Self { - pos: 0, - length, - object_store, - path, - active: Arc::new(Mutex::new(None)), - } - } - - /// For each read request we create a new future. - async fn read_operation( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - length: usize, - ) -> std::task::Poll>> { - let start = self.pos as usize; - - // If we already have a future just poll it. - if let Some(fut) = self.active.lock().await.as_mut() { - return Future::poll(fut.as_mut(), cx); - } - - // Create the future. - let future = { - let path = self.path.clone(); - let arc = self.object_store.clone(); - // Use an async move block to get our owned objects. - async move { - let object_store = arc.lock().await; - object_store - .get_range(&path, start..start + length) - .map_ok(|r| r.to_vec()) - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("object store error {e:?}"), - ) - }) - .await - } - }; - // Prepare for next read. - self.pos += length as u64; - - let mut future = Box::pin(future); - - // Need to poll it once to get the pump going. - let polled = Future::poll(future.as_mut(), cx); - - // Save for next time. - let mut state = self.active.lock().await; - *state = Some(future); - polled - } -} - -impl AsyncRead for CloudReader { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - // Use block_on in order to get the future result in this thread and copy the data in the output buffer. - // With this approach we keep ownership of the buffer and we don't have to pass it to the future runtime. - match block_on(self.read_operation(cx, buf.len())) { - Poll::Ready(Ok(bytes)) => { - buf.copy_from_slice(&bytes); - Poll::Ready(Ok(bytes.len())) - } - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } - } -} - -impl AsyncSeek for CloudReader { - fn poll_seek( - mut self: Pin<&mut Self>, - _: &mut std::task::Context<'_>, - pos: io::SeekFrom, - ) -> std::task::Poll> { - match pos { - io::SeekFrom::Start(pos) => self.pos = pos, - io::SeekFrom::End(pos) => { - let length = self.length.ok_or::(io::Error::new( - std::io::ErrorKind::Other, - "Cannot seek from end of stream when length is unknown.", - ))?; - self.pos = (length as i64 + pos) as u64 - } - io::SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64, - }; - std::task::Poll::Ready(Ok(self.pos)) - } -} - /// Split the url in /// 1. the prefix part (all path components until the first one with '*') /// 2. a regular expression representation of the rest. @@ -214,7 +85,7 @@ pub struct CloudLocation { impl CloudLocation { /// Parse a CloudLocation from an url. - fn new(url: &str) -> PolarsResult { + pub fn new(url: &str) -> PolarsResult { let parsed = Url::parse(url).map_err(anyhow::Error::from)?; let is_local = parsed.scheme() == "file"; let (bucket, key) = if is_local { @@ -247,26 +118,6 @@ fn full_url(scheme: &str, bucket: &str, key: Path) -> String { format!("{scheme}://{bucket}/{key}") } -/// Build an ObjectStore based on the URL and information from the environment. Return an object store and the path relative to the store. -pub fn build(url: &str) -> PolarsResult<(CloudLocation, Box)> { - let cloud_location = CloudLocation::new(url)?; - let store = match cloud_location.scheme.as_str() { - "s3" => { - let s3 = AmazonS3Builder::from_env() - .with_bucket_name(&cloud_location.bucket) - .build() - .map_err(anyhow::Error::from)?; - Ok::<_, PolarsError>(Box::new(s3) as Box) - } - "file" => { - let local = LocalFileSystem::new(); - Ok::<_, PolarsError>(Box::new(local) as Box) - } - _ => unimplemented!(), - }?; - Ok((cloud_location, store)) -} - /// A simple matcher, if more is required consider depending on https://crates.io/crates/globset. /// The Cloud list api returns a list of all the file names under a prefix, there is no additional cost of `readdir`. struct Matcher { @@ -300,7 +151,7 @@ impl Matcher { #[tokio::main(flavor = "current_thread")] /// List files with a prefix derived from the pattern. -pub async fn glob(url: &str) -> PolarsResult> { +pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult> { // Find the fixed prefix, up to the first '*'. let ( @@ -311,7 +162,7 @@ pub async fn glob(url: &str) -> PolarsResult> { expansion, }, store, - ) = build(url)?; + ) = super::build(url, cloud_options)?; let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?; let list_stream = store diff --git a/polars/polars-io/src/cloud/mod.rs b/polars/polars-io/src/cloud/mod.rs new file mode 100644 index 000000000000..e3cb4d40a6aa --- /dev/null +++ b/polars/polars-io/src/cloud/mod.rs @@ -0,0 +1,78 @@ +//! Interface with cloud storage through the object_store crate. + +use std::str::FromStr; + +use object_store::local::LocalFileSystem; +use object_store::ObjectStore; +use polars_core::cloud::{CloudOptions, CloudType}; +use polars_core::prelude::{PolarsError, PolarsResult}; + +mod adaptors; +mod glob; +pub use adaptors::*; +pub use glob::*; + +type BuildResult = PolarsResult<(CloudLocation, Box)>; + +#[allow(dead_code)] +fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { + Err(PolarsError::ComputeError( + format!("Feature '{feature}' must be enabled in order to use '{scheme}' cloud urls.") + .into(), + )) +} +fn err_missing_configuration(feature: &str, scheme: &str) -> BuildResult { + Err(PolarsError::ComputeError( + format!("Configuration '{feature}' must be provided in order to use {scheme} cloud urls.") + .into(), + )) +} +/// Build an ObjectStore based on the URL and passed in url. Return the cloud location and an implementation of the object store. +pub fn build(url: &str, options: Option<&CloudOptions>) -> BuildResult { + let cloud_location = CloudLocation::new(url)?; + let store = match CloudType::from_str(url).map_err(anyhow::Error::from)? { + CloudType::File => { + let local = LocalFileSystem::new(); + Ok::<_, PolarsError>(Box::new(local) as Box) + } + CloudType::Aws => { + #[cfg(feature = "aws")] + match options { + Some(options) => { + let store = options.build_aws(&cloud_location.bucket)?; + Ok::<_, PolarsError>(Box::new(store) as Box) + } + _ => return err_missing_configuration("aws", &cloud_location.scheme), + } + #[cfg(not(feature = "aws"))] + return err_missing_feature("aws", &cloud_location.scheme); + } + CloudType::Gcp => { + #[cfg(feature = "gcp")] + match options { + Some(options) => { + let store = options.build_gcp(&cloud_location.bucket)?; + Ok::<_, PolarsError>(Box::new(store) as Box) + } + _ => return err_missing_configuration("gcp", &cloud_location.scheme), + } + #[cfg(not(feature = "gcp"))] + return err_missing_feature("gcp", &cloud_location.scheme); + } + CloudType::Azure => { + { + #[cfg(feature = "azure")] + match options { + Some(options) => { + let store = options.build_azure(&cloud_location.bucket)?; + Ok::<_, PolarsError>(Box::new(store) as Box) + } + _ => return err_missing_configuration("azure", &cloud_location.scheme), + } + } + #[cfg(not(feature = "azure"))] + return err_missing_feature("azure", &cloud_location.scheme); + } + }?; + Ok((cloud_location, store)) +} diff --git a/polars/polars-io/src/lib.rs b/polars/polars-io/src/lib.rs index 229d0f853344..32b321c2f85e 100644 --- a/polars/polars-io/src/lib.rs +++ b/polars/polars-io/src/lib.rs @@ -3,6 +3,8 @@ #[cfg(feature = "avro")] #[cfg_attr(docsrs, doc(cfg(feature = "avro")))] pub mod avro; +#[cfg(feature = "async")] +mod cloud; #[cfg(any(feature = "csv-file", feature = "json"))] #[cfg_attr(docsrs, doc(cfg(feature = "csv-file")))] pub mod csv; @@ -18,10 +20,8 @@ pub mod json; #[cfg(feature = "json")] #[cfg_attr(docsrs, doc(cfg(feature = "json")))] pub mod ndjson_core; -#[cfg(feature = "object_store")] -mod object_store; -#[cfg(feature = "object_store")] -pub use crate::object_store::glob as async_glob; +#[cfg(feature = "async")] +pub use crate::cloud::glob as async_glob; #[cfg(any( feature = "csv-file", @@ -61,7 +61,6 @@ use polars_core::prelude::*; feature = "json", feature = "avro", feature = "ipc_streaming", - feature = "parquet-async" ))] use crate::predicates::PhysicalIoExpr; @@ -108,7 +107,6 @@ pub trait ArrowReader { feature = "json", feature = "avro", feature = "ipc_streaming", - feature = "parquet-async" ))] pub(crate) fn finish_reader( mut reader: R, diff --git a/polars/polars-io/src/parquet/async_impl.rs b/polars/polars-io/src/parquet/async_impl.rs index eb28e836382c..3b98af823fde 100644 --- a/polars/polars-io/src/parquet/async_impl.rs +++ b/polars/polars-io/src/parquet/async_impl.rs @@ -11,15 +11,16 @@ use futures::lock::Mutex; use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; use object_store::path::Path as ObjectPath; use object_store::ObjectStore; +use polars_core::cloud::CloudOptions; use polars_core::datatypes::PlHashMap; use polars_core::error::PolarsResult; use polars_core::prelude::*; use polars_core::schema::Schema; +use super::cloud::{build, CloudLocation, CloudReader}; use super::mmap; use super::mmap::ColumnStore; use super::read_impl::FetchRowGroups; -use crate::object_store::{build, CloudLocation, CloudReader}; pub struct ParquetObjectStore { store: Arc>>, @@ -29,8 +30,8 @@ pub struct ParquetObjectStore { } impl ParquetObjectStore { - pub fn from_uri(uri: &str) -> PolarsResult { - let (CloudLocation { prefix, .. }, store) = build(uri)?; + pub fn from_uri(uri: &str, options: Option<&CloudOptions>) -> PolarsResult { + let (CloudLocation { prefix, .. }, store) = build(uri, options)?; let store = Arc::new(Mutex::from(store)); Ok(ParquetObjectStore { diff --git a/polars/polars-io/src/parquet/mmap.rs b/polars/polars-io/src/parquet/mmap.rs index 5f20302a2c56..9035617d9b8c 100644 --- a/polars/polars-io/src/parquet/mmap.rs +++ b/polars/polars-io/src/parquet/mmap.rs @@ -3,7 +3,7 @@ use arrow::io::parquet::read::{ column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader, }; -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] use polars_core::datatypes::PlHashMap; use super::*; @@ -21,7 +21,7 @@ use super::*; /// d. when all the data is available deserialize on multiple threads, for example using rayon pub enum ColumnStore<'a> { Local(&'a [u8]), - #[cfg(feature = "parquet-async")] + #[cfg(feature = "async")] Fetched(PlHashMap>), } @@ -45,7 +45,7 @@ fn _mmap_single_column<'a>( let (start, len) = meta.byte_range(); let chunk = match store { ColumnStore::Local(file) => &file[start as usize..(start + len) as usize], - #[cfg(feature = "parquet-async")] + #[cfg(all(feature = "async", feature = "parquet"))] ColumnStore::Fetched(fetched) => { let entry = fetched.get(&start).unwrap_or_else(|| { panic!( diff --git a/polars/polars-io/src/parquet/mod.rs b/polars/polars-io/src/parquet/mod.rs index 829a340cb651..95da99e48b53 100644 --- a/polars/polars-io/src/parquet/mod.rs +++ b/polars/polars-io/src/parquet/mod.rs @@ -14,7 +14,7 @@ //! } //! ``` //! -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] pub(super) mod async_impl; pub(super) mod mmap; pub mod predicates; diff --git a/polars/polars-io/src/parquet/read.rs b/polars/polars-io/src/parquet/read.rs index 9cfd24fb04f9..a1f7ae2b4371 100644 --- a/polars/polars-io/src/parquet/read.rs +++ b/polars/polars-io/src/parquet/read.rs @@ -3,15 +3,17 @@ use std::sync::Arc; use arrow::io::parquet::read; use arrow::io::parquet::write::FileMetaData; +#[cfg(feature = "async")] +use polars_core::cloud::CloudOptions; use polars_core::prelude::*; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use super::read_impl::FetchRowGroupsFromMmapReader; use crate::mmap::MmapBytesReader; -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] use crate::parquet::async_impl::FetchRowGroupsFromObjectStore; -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] use crate::parquet::async_impl::ParquetObjectStore; use crate::parquet::read_impl::read_parquet; pub use crate::parquet::read_impl::BatchedParquetReader; @@ -208,7 +210,7 @@ impl SerReader for ParquetReader { /// A Parquet reader on top of the async object_store API. Only the batch reader is implemented since /// parquet files on cloud storage tend to be big and slow to access. -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] pub struct ParquetAsyncReader { reader: ParquetObjectStore, rechunk: bool, @@ -218,11 +220,14 @@ pub struct ParquetAsyncReader { low_memory: bool, } -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] impl ParquetAsyncReader { - pub fn from_uri(uri: &str) -> PolarsResult { + pub fn from_uri( + uri: &str, + cloud_options: Option<&CloudOptions>, + ) -> PolarsResult { Ok(ParquetAsyncReader { - reader: ParquetObjectStore::from_uri(uri)?, + reader: ParquetObjectStore::from_uri(uri, cloud_options)?, rechunk: false, n_rows: None, projection: None, @@ -233,8 +238,11 @@ impl ParquetAsyncReader { /// Fetch the file info in a synchronous way to for the query planning phase. #[tokio::main(flavor = "current_thread")] - pub async fn file_info(uri: &str) -> PolarsResult<(Schema, usize)> { - let mut reader = ParquetAsyncReader::from_uri(uri)?; + pub async fn file_info( + uri: &str, + options: Option<&CloudOptions>, + ) -> PolarsResult<(Schema, usize)> { + let mut reader = ParquetAsyncReader::from_uri(uri, options)?; let schema = reader.schema().await?; let num_rows = reader.num_rows().await?; Ok((schema, num_rows)) diff --git a/polars/polars-lazy/Cargo.toml b/polars/polars-lazy/Cargo.toml index 8a7c2dfee01d..bc4cd9641bc3 100644 --- a/polars/polars-lazy/Cargo.toml +++ b/polars/polars-lazy/Cargo.toml @@ -33,11 +33,10 @@ compile = ["polars-plan/compile"] streaming = ["chunked_ids", "polars-pipe/compile", "polars-plan/streaming"] default = ["compile", "private"] parquet = ["polars-core/parquet", "polars-io/parquet", "polars-plan/parquet", "polars-pipe/parquet"] -parquet-async = [ - "parquet", - "polars-plan/parquet-async", - "polars-io/parquet-async", - "polars-pipe/parquet-async", +async = [ + "polars-plan/async", + "polars-io/async", + "polars-pipe/async", "streaming", ] ipc = ["polars-io/ipc", "polars-plan/ipc"] diff --git a/polars/polars-lazy/polars-pipe/Cargo.toml b/polars/polars-lazy/polars-pipe/Cargo.toml index b7ca4a24b09c..0d02d19bd8ad 100644 --- a/polars/polars-lazy/polars-pipe/Cargo.toml +++ b/polars/polars-lazy/polars-pipe/Cargo.toml @@ -24,7 +24,7 @@ smartstring = { version = "1" } compile = [] csv-file = ["polars-plan/csv-file", "polars-io/csv-file"] parquet = ["polars-plan/parquet", "polars-io/parquet"] -parquet-async = ["polars-plan/parquet-async", "polars-io/parquet-async"] +async = ["polars-plan/async", "polars-io/async"] nightly = ["polars-core/nightly", "polars-utils/nightly", "hashbrown/nightly"] cross_join = ["polars-core/cross_join"] dtype-u8 = ["polars-core/dtype-u8"] diff --git a/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs b/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs index bb3b38adcc91..4b4d669d5e84 100644 --- a/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs +++ b/polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs @@ -1,10 +1,11 @@ use std::path::PathBuf; +use polars_core::cloud::CloudOptions; use polars_core::error::PolarsResult; use polars_core::schema::*; use polars_core::POOL; use polars_io::parquet::{BatchedParquetReader, ParquetReader}; -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] use polars_io::prelude::ParquetAsyncReader; use polars_io::{is_cloud_url, SerReader}; use polars_plan::prelude::ParquetOptions; @@ -20,9 +21,11 @@ pub struct ParquetSource { } impl ParquetSource { + #[allow(unused_variables)] pub(crate) fn new( path: PathBuf, options: ParquetOptions, + cloud_options: Option, schema: &Schema, ) -> PolarsResult { let projection: Option> = options.with_columns.map(|with_columns| { @@ -34,16 +37,16 @@ impl ParquetSource { let chunk_size = std::cmp::max(CHUNK_SIZE * 12 / POOL.current_num_threads(), 10_000); let batched_reader = if is_cloud_url(&path) { - #[cfg(not(feature = "parquet-async"))] + #[cfg(not(feature = "async"))] { panic!( - "Feature parquet-async is required to access parquet files on cloud storage." + "Feature 'async' (or more likely one of the cloud provider features) is required to access parquet files on cloud storage." ) } - #[cfg(feature = "parquet-async")] + #[cfg(feature = "async")] { let uri = path.to_string_lossy(); - ParquetAsyncReader::from_uri(&uri)? + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref())? .with_n_rows(options.n_rows) .with_row_count(options.row_count) .with_projection(projection) diff --git a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs index 8035afcf3df0..9f60e2db50e0 100644 --- a/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs +++ b/polars/polars-lazy/polars-pipe/src/pipeline/convert.rs @@ -84,6 +84,7 @@ where path, file_info, options, + cloud_options, predicate, output_schema, .. @@ -95,7 +96,7 @@ where let op = Box::new(op) as Box; operator_objects.push(op) } - let src = sources::ParquetSource::new(path, options, &file_info.schema)?; + let src = sources::ParquetSource::new(path, options, cloud_options, &file_info.schema)?; Ok(Box::new(src) as Box) } _ => todo!(), diff --git a/polars/polars-lazy/polars-plan/Cargo.toml b/polars/polars-lazy/polars-plan/Cargo.toml index 67042b330e8a..29dba85b8c79 100644 --- a/polars/polars-lazy/polars-plan/Cargo.toml +++ b/polars/polars-lazy/polars-plan/Cargo.toml @@ -30,7 +30,7 @@ compile = [] default = ["compile", "private"] streaming = [] parquet = ["polars-core/parquet", "polars-io/parquet"] -parquet-async = ["parquet", "polars-io/parquet-async"] +async = [] ipc = ["polars-io/ipc"] json = ["polars-io/json"] csv-file = ["polars-io/csv-file"] diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs b/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs index bb2e3d4c9bc1..c0bd31ac2d73 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/alp.rs @@ -3,6 +3,8 @@ use std::borrow::Cow; use std::path::PathBuf; use std::sync::Arc; +#[cfg(feature = "parquet")] +use polars_core::cloud::CloudOptions; use polars_core::frame::explode::MeltArgs; use polars_core::prelude::*; use polars_utils::arena::{Arena, Node}; @@ -72,6 +74,7 @@ pub enum ALogicalPlan { output_schema: Option, predicate: Option, options: ParquetOptions, + cloud_options: Option, }, DataFrameScan { df: Arc, @@ -375,6 +378,7 @@ impl ALogicalPlan { output_schema, predicate, options, + cloud_options, .. } => { let mut new_predicate = None; @@ -388,6 +392,7 @@ impl ALogicalPlan { output_schema: output_schema.clone(), predicate: new_predicate, options: options.clone(), + cloud_options: cloud_options.clone(), } } #[cfg(feature = "csv-file")] diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs index dd7e16ae2912..fc636d13c386 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/builder.rs @@ -3,6 +3,8 @@ use std::ops::Deref; use std::path::PathBuf; use std::sync::Mutex; +#[cfg(feature = "parquet")] +use polars_core::cloud::CloudOptions; use polars_core::frame::_duplicate_err; use polars_core::frame::explode::MeltArgs; use polars_core::prelude::*; @@ -12,7 +14,7 @@ use polars_io::csv::utils::infer_file_schema; use polars_io::csv::CsvEncoding; #[cfg(feature = "ipc")] use polars_io::ipc::IpcReader; -#[cfg(feature = "parquet-async")] +#[cfg(all(feature = "parquet", feature = "async"))] use polars_io::parquet::ParquetAsyncReader; #[cfg(feature = "parquet")] use polars_io::parquet::ParquetReader; @@ -99,6 +101,7 @@ impl LogicalPlanBuilder { #[cfg(any(feature = "parquet", feature = "parquet_async"))] #[cfg_attr(docsrs, doc(cfg(feature = "parquet")))] + #[allow(clippy::too_many_arguments)] pub fn scan_parquet>( path: P, n_rows: Option, @@ -107,18 +110,22 @@ impl LogicalPlanBuilder { row_count: Option, rechunk: bool, low_memory: bool, + cloud_options: Option, ) -> PolarsResult { use polars_io::{is_cloud_url, SerReader as _}; let path = path.into(); let file_info: PolarsResult = if is_cloud_url(&path) { - #[cfg(not(feature = "parquet-async"))] - panic!("The feature parquet-async needs to be enabled in order to access parquet on cloud storage."); + #[cfg(not(feature = "async"))] + panic!( + "One or more of the cloud storage features ('aws', 'gcp', ...) must be enabled." + ); - #[cfg(feature = "parquet-async")] + #[cfg(feature = "async")] { let uri = path.to_string_lossy(); - let (schema, num_rows) = ParquetAsyncReader::file_info(&uri)?; + let (schema, num_rows) = + ParquetAsyncReader::file_info(&uri, cloud_options.as_ref())?; Ok(FileInfo { schema: Arc::new(schema), row_estimation: (Some(num_rows), num_rows), @@ -150,6 +157,7 @@ impl LogicalPlanBuilder { file_counter: Default::default(), low_memory, }, + cloud_options, } .into()) } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/conversion.rs b/polars/polars-lazy/polars-plan/src/logical_plan/conversion.rs index 86e7703d42b5..34dcba3f7939 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/conversion.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/conversion.rs @@ -242,12 +242,14 @@ pub fn to_alp( file_info, predicate, options, + cloud_options, } => ALogicalPlan::ParquetScan { path, file_info, output_schema: None, predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)), options, + cloud_options, }, LogicalPlan::DataFrameScan { df, @@ -732,11 +734,13 @@ impl ALogicalPlan { output_schema: _, predicate, options, + cloud_options, } => LogicalPlan::ParquetScan { path, file_info, predicate: predicate.map(|n| node_to_expr(n, expr_arena)), options, + cloud_options, }, ALogicalPlan::DataFrameScan { df, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/mod.rs index de7b708ba2f6..9db50381e0b4 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/mod.rs @@ -4,6 +4,8 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::{Arc, Mutex}; +#[cfg(feature = "parquet")] +use polars_core::cloud::CloudOptions; use polars_core::prelude::*; use crate::logical_plan::LogicalPlan::DataFrameScan; @@ -103,6 +105,7 @@ pub enum LogicalPlan { file_info: FileInfo, predicate: Option, options: ParquetOptions, + cloud_options: Option, }, #[cfg(feature = "ipc")] #[cfg_attr(docsrs, doc(cfg(feature = "ipc")))] diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/file_caching.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/file_caching.rs index f6eba528dc7e..b9cc113b2318 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/file_caching.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/file_caching.rs @@ -280,6 +280,7 @@ impl FileCacher { output_schema, predicate, mut options, + cloud_options, } => { let predicate_expr = predicate.map(|node| node_to_expr(node, expr_arena)); let finger_print = FileFingerPrint { @@ -305,6 +306,7 @@ impl FileCacher { output_schema, predicate, options: options.clone(), + cloud_options, }; let lp = self.finish_rewrite( lp, diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs index aa7fc2194df7..7b2ef223dd35 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/predicate_pushdown/mod.rs @@ -291,6 +291,7 @@ impl PredicatePushDown { output_schema, predicate, options, + cloud_options, } => { let local_predicates = partition_by_full_context(&mut acc_predicates, expr_arena); @@ -302,6 +303,7 @@ impl PredicatePushDown { output_schema, predicate, options, + cloud_options, }; Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)) } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs index 23baeb172c49..df0217cd14fa 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/projection_pushdown/mod.rs @@ -445,6 +445,7 @@ impl ProjectionPushDown { file_info, predicate, mut options, + cloud_options, .. } => { let with_columns = get_scan_columns(&mut acc_projections, expr_arena); @@ -466,6 +467,7 @@ impl ProjectionPushDown { output_schema, predicate, options, + cloud_options, }; Ok(lp) } diff --git a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs index a63a3f7c4f83..14869155c121 100644 --- a/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs +++ b/polars/polars-lazy/polars-plan/src/logical_plan/optimizer/slice_pushdown_lp.rs @@ -131,6 +131,7 @@ impl SlicePushDown { output_schema, predicate, options, + cloud_options, }, // TODO! we currently skip slice pushdown if there is a predicate. @@ -143,7 +144,8 @@ impl SlicePushDown { file_info, output_schema, predicate, - options + options, + cloud_options, }; Ok(lp) diff --git a/polars/polars-lazy/src/frame/parquet.rs b/polars/polars-lazy/src/frame/parquet.rs index 3d56586316e8..e478d66f516a 100644 --- a/polars/polars-lazy/src/frame/parquet.rs +++ b/polars/polars-lazy/src/frame/parquet.rs @@ -1,7 +1,8 @@ use std::path::{Path, PathBuf}; +use polars_core::cloud::CloudOptions; use polars_core::prelude::*; -#[cfg(feature = "parquet-async")] +#[cfg(feature = "async")] use polars_io::async_glob; use polars_io::parquet::ParallelStrategy; use polars_io::{is_cloud_url, RowCount}; @@ -16,6 +17,7 @@ pub struct ScanArgsParquet { pub rechunk: bool, pub row_count: Option, pub low_memory: bool, + pub cloud_options: Option, } impl Default for ScanArgsParquet { @@ -27,11 +29,13 @@ impl Default for ScanArgsParquet { rechunk: true, row_count: None, low_memory: false, + cloud_options: None, } } } impl LazyFrame { + #[allow(clippy::too_many_arguments)] fn scan_parquet_impl( path: impl AsRef, n_rows: Option, @@ -40,6 +44,7 @@ impl LazyFrame { row_count: Option, rechunk: bool, low_memory: bool, + cloud_options: Option, ) -> PolarsResult { let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet( path.as_ref(), @@ -49,6 +54,7 @@ impl LazyFrame { None, rechunk, low_memory, + cloud_options, )? .build() .into(); @@ -92,6 +98,7 @@ impl LazyFrame { None, args.rechunk, args.low_memory, + args.cloud_options.clone(), ) }) .collect::>>()?; @@ -106,16 +113,16 @@ impl LazyFrame { let path_str = path.to_string_lossy(); if path_str.contains('*') { let paths = if is_cloud_url(path) { - #[cfg(feature = "parquet-async")] + #[cfg(feature = "async")] { Box::new( - async_glob(&path_str)? + async_glob(&path_str, args.cloud_options.as_ref())? .into_iter() .map(|a| Ok(PathBuf::from(&a))), ) } - #[cfg(not(feature="parquet-async"))] - panic!("Feature `parquet-async` must be enabled to use globbing patterns with cloud urls.") + #[cfg(not(feature = "async"))] + panic!("Feature `async` must be enabled to use globbing patterns with cloud urls.") } else { Box::new( glob::glob(&path_str).map_err(|_| { @@ -134,6 +141,7 @@ impl LazyFrame { None, false, args.low_memory, + args.cloud_options.clone(), ) .map_err(|e| { PolarsError::ComputeError( @@ -158,6 +166,7 @@ impl LazyFrame { args.row_count, args.rechunk, args.low_memory, + args.cloud_options, ) } } diff --git a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 3d0513faa0d9..989fcdf67bb2 100644 --- a/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/polars/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -1,10 +1,14 @@ +use polars_core::cloud::CloudOptions; + use super::*; +#[allow(dead_code)] pub struct ParquetExec { path: PathBuf, schema: SchemaRef, predicate: Option>, options: ParquetOptions, + cloud_options: Option, } impl ParquetExec { @@ -13,12 +17,14 @@ impl ParquetExec { schema: SchemaRef, predicate: Option>, options: ParquetOptions, + cloud_options: Option, ) -> Self { ParquetExec { path, schema, predicate, options, + cloud_options, } } diff --git a/polars/polars-lazy/src/physical_plan/planner/lp.rs b/polars/polars-lazy/src/physical_plan/planner/lp.rs index eb898b5ad193..50b5c5852566 100644 --- a/polars/polars-lazy/src/physical_plan/planner/lp.rs +++ b/polars/polars-lazy/src/physical_plan/planner/lp.rs @@ -221,6 +221,7 @@ pub fn create_physical_plan( output_schema, predicate, options, + cloud_options, } => { let predicate = predicate .map(|pred| { @@ -233,6 +234,7 @@ pub fn create_physical_plan( file_info.schema, predicate, options, + cloud_options, ))) } Projection { diff --git a/py-polars/Cargo.lock b/py-polars/Cargo.lock index a4fc0e626045..29807be70fdf 100644 --- a/py-polars/Cargo.lock +++ b/py-polars/Cargo.lock @@ -547,6 +547,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dyn-clone" version = "1.0.10" @@ -888,6 +894,15 @@ dependencies = [ "ghost", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.5" @@ -1344,6 +1359,25 @@ dependencies = [ "pyo3", ] +[[package]] +name = "object_store" +version = "0.5.3" +source = "git+https://github.com/apache/arrow-rs?rev=c74665808439cb7020fb1cfb74b376a136c73259#c74665808439cb7020fb1cfb74b376a136c73259" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "itertools", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.17.0" @@ -1551,6 +1585,7 @@ dependencies = [ "indexmap", "ndarray", "num", + "object_store", "once_cell", "polars-arrow", "polars-utils", @@ -1581,6 +1616,7 @@ dependencies = [ "memchr", "memmap2", "num", + "object_store", "once_cell", "polars-arrow", "polars-core", @@ -1945,6 +1981,15 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2086,6 +2131,28 @@ dependencies = [ "version_check", ] +[[package]] +name = "snafu" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb0656e7e3ffb70f6c39b3c2a86332bb74aa3c679da781642590f3c1118c5045" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "475b3bbe5245c26f2d8a6f62d67c1f30eb9fffeccee721c45d162c3ebbdf81b2" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "snap" version = "1.1.0" @@ -2228,6 +2295,31 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46" +dependencies = [ + "autocfg", + "bytes", + "memchr", + "pin-project-lite", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "toml" version = "0.5.10" @@ -2237,6 +2329,38 @@ dependencies = [ "serde", ] +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", +] + [[package]] name = "uncased" version = "0.9.7" @@ -2314,6 +2438,17 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" diff --git a/py-polars/src/lazy/dataframe.rs b/py-polars/src/lazy/dataframe.rs index 9b607d4ae338..558e8a9e9f4e 100644 --- a/py-polars/src/lazy/dataframe.rs +++ b/py-polars/src/lazy/dataframe.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::io::BufWriter; use std::path::PathBuf; @@ -11,6 +12,7 @@ use polars::lazy::frame::{AllowedOptimizations, LazyFrame, LazyGroupBy}; use polars::lazy::prelude::col; use polars::prelude::{ClosedWindow, CsvEncoding, DataFrame, Field, JoinType, Schema}; use polars::time::*; +use polars_core::cloud; use polars_core::frame::explode::MeltArgs; use polars_core::frame::UniqueKeepStrategy; use polars_core::prelude::*; @@ -34,6 +36,15 @@ pub struct PyLazyGroupBy { // option because we cannot get a self by value in pyo3 pub lgb: Option, } +/// Extract CloudOptions from a Python object. +fn extract_cloud_options(url: &str, py_object: PyObject) -> PyResult { + let untyped_options = Python::with_gil(|py| py_object.extract::>(py)) + .expect("Expected a dictionary for cloud_options"); + Ok( + cloud::CloudOptions::from_untyped_config(url, untyped_options) + .map_err(PyPolarsErr::from)?, + ) +} #[pymethods] impl PyLazyGroupBy { @@ -264,6 +275,7 @@ impl PyLazyFrame { #[cfg(feature = "parquet")] #[staticmethod] + #[allow(clippy::too_many_arguments)] pub fn new_from_parquet( path: String, n_rows: Option, @@ -272,7 +284,11 @@ impl PyLazyFrame { rechunk: bool, row_count: Option<(String, IdxSize)>, low_memory: bool, + cloud_options: Option, ) -> PyResult { + let cloud_options = cloud_options + .map(|po| extract_cloud_options(&path, po)) + .transpose()?; let row_count = row_count.map(|(name, offset)| RowCount { name, offset }); let args = ScanArgsParquet { n_rows, @@ -281,6 +297,7 @@ impl PyLazyFrame { rechunk, row_count, low_memory, + cloud_options, }; let lf = LazyFrame::scan_parquet(path, args).map_err(PyPolarsErr::from)?; Ok(lf.into())