Skip to content

Commit

Permalink
feat(rust): generalize the cloud storage builders (#5972)
Browse files Browse the repository at this point in the history
  • Loading branch information
winding-lines authored Jan 11, 2023
1 parent fd1e231 commit 851b9bd
Show file tree
Hide file tree
Showing 37 changed files with 735 additions and 213 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,5 @@ features = [
"compute_filter",
"compute_if_then_else",
]

[patch.crates-io]
# packed_simd_2 = { git = "https://github.com/rust-lang/packed_simd", rev = "e57c7ba11386147e6d2cbad7c88f376aab4bdc86" }
2 changes: 1 addition & 1 deletion examples/read_parquet_cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"

[dependencies]
aws-creds = "0.34.0"
polars = { path = "../../polars", features = ["lazy", "parquet-async"] }
polars = { path = "../../polars", features = ["lazy", "aws"] }
16 changes: 10 additions & 6 deletions examples/read_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::env;

use awscreds::Credentials;
use cloud::AmazonS3ConfigKey as Key;
use polars::prelude::*;

// Login to your aws account and then copy the ../datasets/foods1.parquet file to your own bucket.
Expand All @@ -9,11 +8,16 @@ const TEST_S3: &str = "s3://lov2test/polars/datasets/*.parquet";

fn main() -> PolarsResult<()> {
let cred = Credentials::default().unwrap();
env::set_var("AWS_SECRET_ACCESS_KEY", cred.secret_key.unwrap());
env::set_var("AWS_DEFAULT_REGION", "us-west-2");
env::set_var("AWS_ACCESS_KEY_ID", cred.access_key.unwrap());

let df = LazyFrame::scan_parquet(TEST_S3, ScanArgsParquet::default())?
// Propagate the credentials and other cloud options.
let mut args = ScanArgsParquet::default();
let cloud_options = cloud::CloudOptions::default().with_aws([
(Key::AccessKeyId, &cred.access_key.unwrap()),
(Key::SecretAccessKey, &cred.secret_key.unwrap()),
(Key::Region, &"us-west-2".into()),
]);
args.cloud_options = Some(cloud_options);
let df = LazyFrame::scan_parquet(TEST_S3, args)?
.with_streaming(true)
.select([
// select all columns
Expand Down
5 changes: 4 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ ndarray = ["polars-core/ndarray"]
serde = ["polars-core/serde"]
serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde", "polars-time/serde", "polars-io/serde", "polars-ops/serde"]
parquet = ["polars-io", "polars-core/parquet", "polars-lazy/parquet", "polars-io/parquet"]
parquet-async = ["parquet", "polars-io", "polars-core/parquet", "polars-lazy/parquet-async", "polars-io/parquet"]
async = ["polars-lazy/async"]
aws = ["async", "polars-io/aws"]
azure = ["async", "polars-io/azure"]
gcp = ["async", "polars-io/gcp"]
lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"]
# commented out until UB is fixed
# parallel = ["polars-core/parallel"]
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ docs-selection = [
"partition_by",
]

# Cloud support.
"async" = ["url"]
"aws" = ["async", "object_store/aws"]
"azure" = ["async", "object_store/azure"]
"gcp" = ["async", "object_store/gcp"]

[dependencies]
ahash.workspace = true
anyhow.workspace = true
Expand All @@ -161,6 +167,7 @@ hex = { version = "0.4", optional = true }
indexmap = { version = "1", features = ["std"] }
ndarray = { version = "0.15", optional = true, default_features = false }
num.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell = "1"
polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.26.1", path = "../polars-utils" }
Expand All @@ -173,6 +180,7 @@ serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
smartstring = { version = "1" }
thiserror.workspace = true
url = { version = "2.3.1", optional = true }
xxhash-rust.workspace = true

[target.'cfg(target_family = "wasm")'.dependencies]
Expand Down
234 changes: 234 additions & 0 deletions polars/polars-core/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use std::str::FromStr;

#[cfg(feature = "aws")]
use object_store::aws::AmazonS3Builder;
#[cfg(feature = "aws")]
pub use object_store::aws::AmazonS3ConfigKey;
#[cfg(feature = "azure")]
pub use object_store::azure::AzureConfigKey;
#[cfg(feature = "azure")]
use object_store::azure::MicrosoftAzureBuilder;
#[cfg(feature = "gcp")]
use object_store::gcp::GoogleCloudStorageBuilder;
#[cfg(feature = "gcp")]
pub use object_store::gcp::GoogleConfigKey;
#[cfg(feature = "async")]
use object_store::ObjectStore;
#[cfg(feature = "serde-lazy")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "async")]
use url::Url;

use crate::error::{PolarsError, PolarsResult};

/// The type of the config keys must satisfy the following requirements:
/// 1. must be easily collected into a HashMap, the type required by the object_crate API.
/// 2. be Serializable, required when the serde-lazy feature is defined.
/// 3. not actually use HashMap since that type is disallowed in Polars for performance reasons.
///
/// Currently this type is a vector of pairs config key - config value.
#[allow(dead_code)]
type Configs<T> = Vec<(T, String)>;

#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
/// Options to conect to various cloud providers.
pub struct CloudOptions {
#[cfg(feature = "aws")]
aws: Option<Configs<AmazonS3ConfigKey>>,
#[cfg(feature = "azure")]
azure: Option<Configs<AzureConfigKey>>,
#[cfg(feature = "gcp")]
gcp: Option<Configs<GoogleConfigKey>>,
}

#[allow(dead_code)]
/// Parse an untype configuration hashmap to a typed configuration for the given configuration key type.
fn parsed_untyped_config<T, I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
config: I,
) -> PolarsResult<Configs<T>>
where
T: FromStr + std::cmp::Eq + std::hash::Hash,
{
config
.into_iter()
.map(|(key, val)| {
T::from_str(key.as_ref())
.map_err(|_e| {
PolarsError::ComputeError(
format!("Unknown configuration key {}.", key.as_ref()).into(),
)
})
.map(|typed_key| (typed_key, val.into()))
})
.collect::<PolarsResult<Configs<T>>>()
}

pub enum CloudType {
Aws,
Azure,
File,
Gcp,
}

impl FromStr for CloudType {
type Err = PolarsError;

#[cfg(feature = "async")]
fn from_str(url: &str) -> Result<Self, Self::Err> {
let parsed = Url::parse(url).map_err(anyhow::Error::from)?;
match parsed.scheme() {
"s3" => Ok(Self::Aws),
"az" | "adl" | "abfs" => Ok(Self::Azure),
"gs" | "gcp" => Ok(Self::Gcp),
"file" => Ok(Self::File),
&_ => Err(PolarsError::ComputeError("Unknown url scheme.".into())),
}
}

#[cfg(not(feature = "async"))]
fn from_str(_s: &str) -> Result<Self, Self::Err> {
Err(PolarsError::ComputeError(
"At least one of the cloud features must be enabled.".into(),
))
}
}

impl CloudOptions {
/// Set the configuration for AWS connections. This is the preferred API from rust.
#[cfg(feature = "aws")]
pub fn with_aws<I: IntoIterator<Item = (AmazonS3ConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.aws = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<AmazonS3ConfigKey>>(),
);
self
}

/// Build the ObjectStore implementation for AWS.
#[cfg(feature = "aws")]
pub fn build_aws(&self, bucket_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self.aws.as_ref().map(Ok).unwrap_or_else(|| {
Err(PolarsError::ComputeError(
"`aws` configuration missing.".into(),
))
})?;
AmazonS3Builder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_bucket_name(bucket_name).build())
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))
}

/// Set the configuration for Azure connections. This is the preferred API from rust.
#[cfg(feature = "azure")]
pub fn with_azure<I: IntoIterator<Item = (AzureConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.azure = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<AzureConfigKey>>(),
);
self
}

/// Build the ObjectStore implementation for Azure.
#[cfg(feature = "azure")]
pub fn build_azure(&self, container_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self.azure.as_ref().map(Ok).unwrap_or_else(|| {
Err(PolarsError::ComputeError(
"`azure` configuration missing.".into(),
))
})?;
MicrosoftAzureBuilder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_container_name(container_name).build())
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))
}

/// Set the configuration for GCP connections. This is the preferred API from rust.
#[cfg(feature = "gcp")]
pub fn with_gcp<I: IntoIterator<Item = (GoogleConfigKey, impl Into<String>)>>(
mut self,
configs: I,
) -> Self {
self.gcp = Some(
configs
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<Configs<GoogleConfigKey>>(),
);
self
}

/// Build the ObjectStore implementation for GCP.
#[cfg(feature = "gcp")]
pub fn build_gcp(&self, bucket_name: &str) -> PolarsResult<impl ObjectStore> {
let options = self.gcp.as_ref().map(Ok).unwrap_or_else(|| {
Err(PolarsError::ComputeError(
"`gcp` configuration missing.".into(),
))
})?;
GoogleCloudStorageBuilder::new()
.try_with_options(options.clone().into_iter())
.and_then(|b| b.with_bucket_name(bucket_name).build())
.map_err(|e| PolarsError::ComputeError(e.to_string().into()))
}

/// Parse a configuration from a Hashmap. This is the interface from Python.
#[allow(unused_variables)]
pub fn from_untyped_config<I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>>(
url: &str,
config: I,
) -> PolarsResult<Self> {
match CloudType::from_str(url)? {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
parsed_untyped_config::<AmazonS3ConfigKey, _>(config)
.map(|aws| Self::default().with_aws(aws))
}
#[cfg(not(feature = "aws"))]
{
Err(PolarsError::ComputeError(
"Feature aws is not enabled.".into(),
))
}
}
CloudType::Azure => {
#[cfg(feature = "azure")]
{
parsed_untyped_config::<AzureConfigKey, _>(config)
.map(|azure| Self::default().with_azure(azure))
}
#[cfg(not(feature = "azure"))]
{
Err(PolarsError::ComputeError(
"Feature gcp is not enabled.".into(),
))
}
}
CloudType::File => Ok(Self::default()),
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
parsed_untyped_config::<GoogleConfigKey, _>(config)
.map(|gcp| Self::default().with_gcp(gcp))
}
#[cfg(not(feature = "gcp"))]
{
Err(PolarsError::ComputeError(
"Feature gcp is not enabled.".into(),
))
}
}
}
}
}
2 changes: 1 addition & 1 deletion polars/polars-core/src/datatypes/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ impl PartialOrd for AnyValue<'_> {
(Float64(l), Float64(r)) => l.partial_cmp(r),
(Utf8(l), Utf8(r)) => l.partial_cmp(*r),
#[cfg(feature = "dtype-binary")]
(Binary(l), Binary(r)) => l.partial_cmp(r),
(Binary(l), Binary(r)) => l.partial_cmp(*r),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,6 @@ impl PolarsIntegerType for Int64Type {}
pub trait PolarsFloatType: PolarsNumericType {}
impl PolarsFloatType for Float32Type {}
impl PolarsFloatType for Float64Type {}

// Provide options to cloud providers (credentials, region).
pub type CloudOptions = PlHashMap<String, String>;
1 change: 1 addition & 0 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate core;
#[macro_use]
pub mod utils;
pub mod chunked_array;
pub mod cloud;
pub(crate) mod config;
pub mod datatypes;
#[cfg(feature = "docs")]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ pub use crate::testing::*;
pub(crate) use crate::utils::CustomIterTools;
pub use crate::utils::IntoVec;
pub use crate::vector_hasher::{VecHash, VecHashSingle};
pub use crate::{datatypes, df};
pub use crate::{cloud, datatypes, df};
7 changes: 5 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ dtype-binary = ["polars-core/dtype-binary"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"]
parquet-async = ["parquet", "async-trait", "futures", "object_store", "tokio", "url"]
async = ["async-trait", "futures", "object_store", "tokio", "url"]
aws = ["object_store/aws", "async", "polars-core/aws"]
azure = ["object_store/azure", "async", "polars-core/azure"]
gcp = ["object_store/gcp", "async", "polars-core/gcp"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
# don't use this
Expand All @@ -51,7 +54,7 @@ lexical-core = { version = "0.8", optional = true }
memchr = "2.5"
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num.workspace = true
object_store = { version = "0.5.2", features = ["aws"], optional = true }
object_store = { version = "0.5.3", default-features = false, optional = true }
once_cell = "1"
polars-arrow = { version = "0.26.1", path = "../polars-arrow" }
polars-core = { version = "0.26.1", path = "../polars-core", features = ["private"], default-features = false }
Expand Down
Loading

0 comments on commit 851b9bd

Please sign in to comment.