Skip to content

Commit

Permalink
feat(rust): generalize the cloud storage builders
Browse files Browse the repository at this point in the history
  • Loading branch information
winding-lines committed Jan 3, 2023
1 parent e3d0e83 commit 3e098e7
Show file tree
Hide file tree
Showing 39 changed files with 551 additions and 213 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ features = [
"compute_filter",
"compute_if_then_else",
]
[workspace.dependencies.object_store]
package = "object_store"
git = "https://github.com/winding-lines/arrow-rs"
rev = "8f87256eaac64bdc9d7c2a72478b29dd9f123c9f"
default-features = false

[patch.crates-io]
# packed_simd_2 = { git = "https://github.com/rust-lang/packed_simd", rev = "e57c7ba11386147e6d2cbad7c88f376aab4bdc86" }
2 changes: 1 addition & 1 deletion examples/read_parquet_cloud/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"

[dependencies]
aws-creds = "0.34.0"
polars = { path = "../../polars", features = ["lazy", "parquet-async"] }
polars = { path = "../../polars", features = ["lazy", "aws"] }
16 changes: 10 additions & 6 deletions examples/read_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::env;

use awscreds::Credentials;
use polars::prelude::*;

Expand All @@ -9,11 +7,17 @@ const TEST_S3: &str = "s3://lov2test/polars/datasets/*.parquet";

fn main() -> PolarsResult<()> {
let cred = Credentials::default().unwrap();
env::set_var("AWS_SECRET_ACCESS_KEY", cred.secret_key.unwrap());
env::set_var("AWS_DEFAULT_REGION", "us-west-2");
env::set_var("AWS_ACCESS_KEY_ID", cred.access_key.unwrap());

let df = LazyFrame::scan_parquet(TEST_S3, ScanArgsParquet::default())?
// Propagate the credentials and other cloud options.
let mut args = ScanArgsParquet::default();
let cloud_options = cloud_options::CloudOptions::default().with_aws(
cloud_options::AmazonS3Builder::new()
.with_secret_access_key(cred.secret_key.unwrap())
.with_region("us-west-2")
.with_access_key_id(cred.access_key.unwrap()),
);
args.cloud_options = Some(cloud_options);
let df = LazyFrame::scan_parquet(TEST_S3, args)?
.with_streaming(true)
.select([
// select all columns
Expand Down
5 changes: 4 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ ndarray = ["polars-core/ndarray"]
serde = ["polars-core/serde"]
serde-lazy = ["polars-core/serde-lazy", "polars-lazy/serde", "polars-time/serde", "polars-io/serde", "polars-ops/serde"]
parquet = ["polars-io", "polars-core/parquet", "polars-lazy/parquet", "polars-io/parquet"]
parquet-async = ["parquet", "polars-io", "polars-core/parquet", "polars-lazy/parquet-async", "polars-io/parquet"]
async = ["polars-lazy/async"]
aws = ["async", "polars-io/aws"]
azure = ["async", "polars-io/azure"]
gcp = ["async", "polars-io/gcp"]
lazy = ["polars-core/lazy", "polars-lazy", "polars-lazy/compile"]
# commented out until UB is fixed
# parallel = ["polars-core/parallel"]
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ docs-selection = [
"partition_by",
]

# Cloud support.
"aws" = ["object_store/aws"]
"azure" = ["object_store/azure"]
"gcp" = ["object_store/gcp"]

[dependencies]
ahash.workspace = true
anyhow.workspace = true
Expand All @@ -161,6 +166,7 @@ hex = { version = "0.4", optional = true }
indexmap = { version = "1", features = ["std"] }
ndarray = { version = "0.15", optional = true, default_features = false }
num.workspace = true
object_store.workspace = true
once_cell = "1"
polars-arrow = { version = "0.26.1", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.26.1", path = "../polars-utils" }
Expand Down
38 changes: 38 additions & 0 deletions polars/polars-core/src/cloud_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#[cfg(feature = "aws")]
pub use object_store::aws::AmazonS3Builder;
#[cfg(feature = "azure")]
pub use object_store::azure::MicrosoftAzureBuilder;
#[cfg(feature = "gcp")]
pub use object_store::gcp::GoogleCloudStorageBuilder;
#[cfg(feature = "serde-lazy")]
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde-lazy", derive(Serialize, Deserialize))]
/// Options to conect to various cloud providers.
pub struct CloudOptions {
#[cfg(feature = "aws")]
pub aws: Option<AmazonS3Builder>,
#[cfg(feature = "azure")]
pub azure: Option<MicrosoftAzureBuilder>,
#[cfg(feature = "gcp")]
pub gcp: Option<GoogleCloudStorageBuilder>,
}

impl CloudOptions {
#[cfg(feature = "aws")]
pub fn with_aws(mut self, aws: AmazonS3Builder) -> Self {
self.aws = Some(aws);
self
}
#[cfg(feature="azure")]
pub fn with_azure(mut self, azure: MicrosoftAzureBuilder) -> Self {
self.azure = Some(azure);
self
}
#[cfg(feature = "gcp")]
pub fn with_gcp(mut self, gcp: GoogleCloudStorageBuilder) -> Self {
self.gcp = Some(gcp);
self
}
}
2 changes: 1 addition & 1 deletion polars/polars-core/src/datatypes/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ impl PartialOrd for AnyValue<'_> {
(Float64(l), Float64(r)) => l.partial_cmp(r),
(Utf8(l), Utf8(r)) => l.partial_cmp(*r),
#[cfg(feature = "dtype-binary")]
(Binary(l), Binary(r)) => l.partial_cmp(r),
(Binary(l), Binary(r)) => l.partial_cmp(*r),
_ => None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,6 @@ impl PolarsIntegerType for Int64Type {}
pub trait PolarsFloatType: PolarsNumericType {}
impl PolarsFloatType for Float32Type {}
impl PolarsFloatType for Float64Type {}

// Provide options to cloud providers (credentials, region).
pub type CloudOptions = PlHashMap<String, String>;
1 change: 1 addition & 0 deletions polars/polars-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate core;
#[macro_use]
pub mod utils;
pub mod chunked_array;
pub mod cloud_options;
pub(crate) mod config;
pub mod datatypes;
#[cfg(feature = "docs")]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@ pub use crate::testing::*;
pub(crate) use crate::utils::CustomIterTools;
pub use crate::utils::IntoVec;
pub use crate::vector_hasher::{VecHash, VecHashSingle};
pub use crate::{datatypes, df};
pub use crate::{cloud_options, datatypes, df};
7 changes: 5 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ dtype-binary = ["polars-core/dtype-binary"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"]
parquet-async = ["parquet", "async-trait", "futures", "object_store", "tokio", "url"]
async = ["async-trait", "futures", "tokio", "url"]
aws = ["object_store/aws", "async", "polars-core/aws"]
azure = ["object_store/azure", "async", "polars-core/azure"]
gcp = ["object_store/gcp", "async", "polars-core/gcp"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
# don't use this
Expand All @@ -51,7 +54,7 @@ lexical-core = { version = "0.8", optional = true }
memchr = "2.5"
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num.workspace = true
object_store = { version = "0.5.2", features = ["aws"], optional = true }
object_store.workspace = true
once_cell = "1"
polars-arrow = { version = "0.26.1", path = "../polars-arrow" }
polars-core = { version = "0.26.1", path = "../polars-core", features = ["private"], default-features = false }
Expand Down
131 changes: 131 additions & 0 deletions polars/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//! Interface with the object_store crate and define AsyncSeek, AsyncRead.
//! This is used, for example, by the parquet2 crate.
use std::io::{self};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;

use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt};
use object_store::path::Path;
use object_store::ObjectStore;

type OptionalFuture = Arc<Mutex<Option<BoxFuture<'static, std::io::Result<Vec<u8>>>>>>;

/// Adaptor to translate from AsyncSeek and AsyncRead to the object_store get_range API.
pub struct CloudReader {
// The current position in the stream, it is set by seeking and updated by reading bytes.
pos: u64,
// The total size of the object is required when seeking from the end of the file.
length: Option<u64>,
// Hold an reference to the store in a thread safe way.
object_store: Arc<Mutex<Box<dyn ObjectStore>>>,
// The path in the object_store of the current object being read.
path: Path,
// If a read is pending then `active` will point to its future.
active: OptionalFuture,
}

impl CloudReader {
pub fn new(
length: Option<u64>,
object_store: Arc<Mutex<Box<dyn ObjectStore>>>,
path: Path,
) -> Self {
Self {
pos: 0,
length,
object_store,
path,
active: Arc::new(Mutex::new(None)),
}
}

/// For each read request we create a new future.
async fn read_operation(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
length: usize,
) -> std::task::Poll<std::io::Result<Vec<u8>>> {
let start = self.pos as usize;

// If we already have a future just poll it.
if let Some(fut) = self.active.lock().await.as_mut() {
return Future::poll(fut.as_mut(), cx);
}

// Create the future.
let future = {
let path = self.path.clone();
let arc = self.object_store.clone();
// Use an async move block to get our owned objects.
async move {
let object_store = arc.lock().await;
object_store
.get_range(&path, start..start + length)
.map_ok(|r| r.to_vec())
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("object store error {e:?}"),
)
})
.await
}
};
// Prepare for next read.
self.pos += length as u64;

let mut future = Box::pin(future);

// Need to poll it once to get the pump going.
let polled = Future::poll(future.as_mut(), cx);

// Save for next time.
let mut state = self.active.lock().await;
*state = Some(future);
polled
}
}

impl AsyncRead for CloudReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
// Use block_on in order to get the future result in this thread and copy the data in the output buffer.
// With this approach we keep ownership of the buffer and we don't have to pass it to the future runtime.
match block_on(self.read_operation(cx, buf.len())) {
Poll::Ready(Ok(bytes)) => {
buf.copy_from_slice(&bytes);
Poll::Ready(Ok(bytes.len()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}

impl AsyncSeek for CloudReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: io::SeekFrom,
) -> std::task::Poll<std::io::Result<u64>> {
match pos {
io::SeekFrom::Start(pos) => self.pos = pos,
io::SeekFrom::End(pos) => {
let length = self.length.ok_or::<io::Error>(io::Error::new(
std::io::ErrorKind::Other,
"Cannot seek from end of stream when length is unknown.",
))?;
self.pos = (length as i64 + pos) as u64
}
io::SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
std::task::Poll::Ready(Ok(self.pos))
}
}
Loading

0 comments on commit 3e098e7

Please sign in to comment.