Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): generalize the cloud storage builders #5972

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,5 @@ features = [
"compute_filter",
"compute_if_then_else",
]

[patch.crates-io]
# packed_simd_2 = { git = "https://github.com/rust-lang/packed_simd", rev = "e57c7ba11386147e6d2cbad7c88f376aab4bdc86" }
2 changes: 1 addition & 1 deletion examples/read_parquet_cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"

[dependencies]
aws-creds = "0.34.0"
polars = { path = "../../polars", features = ["lazy", "parquet-async"] }
polars = { path = "../../polars", features = ["lazy", "aws"] }
16 changes: 10 additions & 6 deletions examples/read_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::env;

use awscreds::Credentials;
use cloud::AmazonS3ConfigKey as Key;
use polars::prelude::*;

// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket.
Expand All @@ -9,11 +8,16 @@ const TEST_S3: &str = "s3://lov2test/polars/datasets/*.parquet";

fn main() -> PolarsResult<()> {
let cred = Credentials::default().unwrap();
env::set_var("AWS_SECRET_ACCESS_KEY", cred.secret_key.unwrap());
env::set_var("AWS_DEFAULT_REGION", "us-west-2");
env::set_var("AWS_ACCESS_KEY_ID", cred.access_key.unwrap());

let df = LazyFrame::scan_parquet(TEST_S3, ScanArgsParquet::default())?
// Propagate the credentials and other cloud options.
let mut args = ScanArgsParquet::default();
let cloud_options = cloud::CloudOptions::default().with_aws([
(Key::AccessKeyId, &cred.access_key.unwrap()),
(Key::SecretAccessKey, &cred.secret_key.unwrap()),
(Key::Region, &"us-west-2".into()),
]);
args.cloud_options = Some(cloud_options);
let df = LazyFrame::scan_parquet(TEST_S3, args)?
.with_streaming(true)
.select([
// select all columns
Expand Down
5 changes: 4 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ ndarray = ["polars-core/ndarray"]
serde = ["polars-core/serde"]
serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde", "polars-time/serde", "polars-io/serde", "polars-ops/serde"]
parquet = ["polars-io", "polars-core/parquet", "polars-lazy/parquet", "polars-io/parquet"]
parquet-async = ["parquet", "polars-io", "polars-core/parquet", "polars-lazy/parquet-async", "polars-io/parquet"]
async = ["polars-lazy/async"]
aws = ["async", "polars-io/aws"]
azure = ["async", "polars-io/azure"]
gcp = ["async", "polars-io/gcp"]
lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"]
# commented out until UB is fixed
# parallel = ["polars-core/parallel"]
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ docs-selection = [
"partition_by",
]

# Cloud support.
"async" = ["url"]
"aws" = ["async", "object_store/aws"]
"azure" = ["async", "object_store/azure"]
"gcp" = ["async", "object_store/gcp"]

[dependencies]
ahash.workspace = true
anyhow.workspace = true
Expand All @@ -161,6 +167,7 @@ hex = { version = "0.4", optional = true }
indexmap = { version = "1", features = ["std"] }
ndarray = { version = "0.15", optional = true, default_features = false }
num.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell = "1"
polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.26.1", path = "../polars-utils" }
Expand All @@ -173,6 +180,7 @@ serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
smartstring = { version = "1" }
thiserror.workspace = true
url = { version = "2.3.1", optional = true }
xxhash-rust.workspace = true

[target.'cfg(target_family = "wasm")'.dependencies]
Expand Down
234 changes: 234 additions & 0 deletions polars/polars-core/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use std::str::FromStr;

#[cfg(feature = "aws")]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "aws")]
pub use object_store::aws::AmazonS3ConfigKey;
#[cfg(feature = "azure")]
pub use object_store::azure::AzureConfigKey;
#[cfg(feature = "azure")]
use object_store::azure::MicrosoftAzureBuilder;
#[cfg(feature = "gcp")]
use object_store::gcp::GoogleCloudStorageBuilder;
#[cfg(feature = "gcp")]
pub use object_store::gcp::GoogleConfigKey;
#[cfg(feature = "async")]
use object_store::ObjectStore;
#[cfg(feature = "serde-lazy")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "async")]
use url::Url;

use crate::error::{PolarsError, PolarsResult};

/// The type of the config keys must satisfy the following requirements:
/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
/// 2. be Serializable, required when the serde-lazy feature is defined.
/// 3. not actually use HashMap since that type is disallowed in Polars for performance reasons.
///
/// Currently this type is a vector of pairs config key - config value.
#[allow(dead_code)]
type Configs<T> = Vec<(T, String)>;

#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
/// Options to conect to various cloud providers.
pub struct CloudOptions {
#[cfg(feature = "aws")]
aws: Option<Configs<AmazonS3ConfigKey>>,
#[cfg(feature = "azure")]
azure: Option<Configs<AzureConfigKey>>,
#[cfg(feature = "gcp")]
gcp: Option<Configs<GoogleConfigKey>>,
}

#[allow(dead_code)]
/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type.
fn parsed_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
config: I,
) -> PolarsResult<Configs<T>>
where
T: FromStr + std::cmp::Eq + std::hash::Hash,
{
config
.into_iter()
.map(|(key, val)| {
T::from_str(key.as_ref())
.map_err(|_e| {
PolarsError::ComputeError(
format!("Unknown configuration key {}.", key.as_ref()).into(),
)
})
.map(|typed_key| (typed_key, val.into()))
})
.collect::<PolarsResult<Configs<T>>>()
}

pub enum CloudType {
Aws,
Azure,
File,
Gcp,
}

impl FromStr for CloudType {
type Err = PolarsError;

#[cfg(feature = "async")]
fn from_str(url: &str) -> Result<Self, Self::Err> {
let parsed = Url::parse(url).map_err(anyhow::Error::from)?;
match parsed.scheme() {
"s3" => Ok(Self::Aws),
"az" | "adl" | "abfs" => Ok(Self::Azure),
"gs" | "gcp" => Ok(Self::Gcp),
"file" => Ok(Self::File),
&_ => Err(PolarsError::ComputeError("Unknown url scheme.".into())),
}
}

#[cfg(not(feature = "async"))]
fn from_str(_s: &str) -> Result<Self, Self::Err> {
Err(PolarsError::ComputeError(
"At least one of the cloud features must be enabled.".into(),
))
}
}

impl CloudOptions {
/// Set the configuration for AWS connections. This is the preferred API from rust.
#[cfg(feature = "aws")]
pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.aws = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<AmazonS3ConfigKey>>(),
);
self
}

/// Build the ObjectStore implementation for AWS.
#[cfg(feature = "aws")]
pub fn build_aws(&self, bucket_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self.aws.as_ref().map(Ok).unwrap_or_else(|| {
Err(PolarsError::ComputeError(
"`aws` configuration missing.".into(),
))
})?;
AmazonS3Builder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_bucket_name(bucket_name).build())
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))
}

/// Set the configuration for Azure connections. This is the preferred API from rust.
#[cfg(feature = "azure")]
pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.azure = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<AzureConfigKey>>(),
);
self
}

/// Build the ObjectStore implementation for Azure.
#[cfg(feature = "azure")]
pub fn build_azure(&self, container_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self.azure.as_ref().map(Ok).unwrap_or_else(|| {
Err(PolarsError::ComputeError(
"`azure` configuration missing.".into(),
))
})?;
MicrosoftAzureBuilder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_container_name(container_name).build())
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))
}

/// Set the configuration for GCP connections. This is the preferred API from rust.
#[cfg(feature = "gcp")]
pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.gcp = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<GoogleConfigKey>>(),
);
self
}

/// Build the ObjectStore implementation for GCP.
#[cfg(feature = "gcp")]
pub fn build_gcp(&self, bucket_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self.gcp.as_ref().map(Ok).unwrap_or_else(|| {
Err(PolarsError::ComputeError(
"`gcp` configuration missing.".into(),
))
})?;
GoogleCloudStorageBuilder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_bucket_name(bucket_name).build())
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))
}

/// Parse a configuration from a Hashmap. This is the interface from Python.
#[allow(unused_variables)]
pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
url: &str,
config: I,
) -> PolarsResult<Self> {
match CloudType::from_str(url)? {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
parsed_untyped_config::<AmazonS3ConfigKey, _>(config)
.map(|aws| Self::default().with_aws(aws))
}
#[cfg(not(feature = "aws"))]
{
Err(PolarsError::ComputeError(
"Feature aws is not enabled.".into(),
))
}
}
CloudType::Azure => {
#[cfg(feature = "azure")]
{
parsed_untyped_config::<AzureConfigKey, _>(config)
.map(|azure| Self::default().with_azure(azure))
}
#[cfg(not(feature = "azure"))]
{
Err(PolarsError::ComputeError(
"Feature gcp is not enabled.".into(),
))
}
}
CloudType::File => Ok(Self::default()),
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
parsed_untyped_config::<GoogleConfigKey, _>(config)
.map(|gcp| Self::default().with_gcp(gcp))
}
#[cfg(not(feature = "gcp"))]
{
Err(PolarsError::ComputeError(
"Feature gcp is not enabled.".into(),
))
}
}
}
}
}
2 changes: 1 addition & 1 deletion polars/polars-core/src/datatypes/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ impl PartialOrd for AnyValue<'_> {
(Float64(l), Float64(r)) => l.partial_cmp(r),
(Utf8(l), Utf8(r)) => l.partial_cmp(*r),
#[cfg(feature = "dtype-binary")]
(Binary(l), Binary(r)) => l.partial_cmp(r),
(Binary(l), Binary(r)) => l.partial_cmp(*r),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,6 @@ impl PolarsIntegerType for Int64Type {}
pub trait PolarsFloatType: PolarsNumericType {}
impl PolarsFloatType for Float32Type {}
impl PolarsFloatType for Float64Type {}

// Provide options to cloud providers (credentials, region).
pub type CloudOptions = PlHashMap<String, String>;
1 change: 1 addition & 0 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate core;
#[macro_use]
pub mod utils;
pub mod chunked_array;
pub mod cloud;
pub(crate) mod config;
pub mod datatypes;
#[cfg(feature = "docs")]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ pub use crate::testing::*;
pub(crate) use crate::utils::CustomIterTools;
pub use crate::utils::IntoVec;
pub use crate::vector_hasher::{VecHash, VecHashSingle};
pub use crate::{datatypes, df};
pub use crate::{cloud, datatypes, df};
7 changes: 5 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ dtype-binary = ["polars-core/dtype-binary"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"]
parquet-async = ["parquet", "async-trait", "futures", "object_store", "tokio", "url"]
async = ["async-trait", "futures", "object_store", "tokio", "url"]
aws = ["object_store/aws", "async", "polars-core/aws"]
azure = ["object_store/azure", "async", "polars-core/azure"]
gcp = ["object_store/gcp", "async", "polars-core/gcp"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
# don't use this
Expand All @@ -51,7 +54,7 @@ lexical-core = { version = "0.8", optional = true }
memchr = "2.5"
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num.workspace = true
object_store = { version = "0.5.2", features = ["aws"], optional = true }
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell = "1"
polars-arrow = { version = "0.26.1", path = "../polars-arrow" }
polars-core = { version = "0.26.1", path = "../polars-core", features = ["private"], default-features = false }
Expand Down
Loading