From bf7f204a6e7abce07b3da938637162cc8c457d93 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 3 Jul 2024 16:07:08 -0700 Subject: [PATCH] Minor cleanup to use Bytes instead of Vec --- src/daft-io/src/azure_blob.rs | 2 +- src/daft-io/src/google_cloud.rs | 2 +- src/daft-io/src/http.rs | 2 +- src/daft-io/src/lib.rs | 30 +++++++++++++++++++----------- src/daft-io/src/local.rs | 2 +- src/daft-io/src/object_io.rs | 2 +- src/daft-io/src/s3_like.rs | 4 ++-- 7 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 16b9d817f3..bc087abc9c 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -532,7 +532,7 @@ impl ObjectSource for AzureBlobSource { async fn put( &self, _uri: &str, - _data: Vec, + _data: bytes::Bytes, _io_stats: Option, ) -> super::Result<()> { todo!(); diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index a182081bf5..d1db0bf231 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -419,7 +419,7 @@ impl ObjectSource for GCSSource { async fn put( &self, _uri: &str, - _data: Vec, + _data: bytes::Bytes, _io_stats: Option, ) -> super::Result<()> { todo!(); diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index 08104d5d67..b7641ab2ea 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -234,7 +234,7 @@ impl ObjectSource for HttpSource { async fn put( &self, _uri: &str, - _data: Vec, + _data: bytes::Bytes, _io_stats: Option, ) -> super::Result<()> { todo!(); diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 1c050d871f..1e0f192681 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -257,7 +257,7 @@ impl IOClient { pub async fn single_url_put( &self, dest: &str, - data: Vec, + data: bytes::Bytes, io_stats: Option, ) -> Result<()> { let (scheme, dest) = parse_url(dest)?; @@ -313,7 +313,7 @@ impl IOClient { &self, index: usize, dest: String, - data: Option>, + data: Option, io_stats: Option, ) -> Result> { let value = if let Some(data) = data { @@ -601,6 +601,10 @@ pub fn url_download( } } +/// Uploads data from a Binary/FixedSizeBinary/Utf8 Series to the provided folder_path +/// +/// This performs an async upload of each row, and creates in-memory copies of the data that is currently in-flight. +/// Memory consumption should be tunable by configuring `max_connections`, which tunes the number of in-flight tokio tasks. pub fn upload_to_folder( series: &Series, folder_path: &str, @@ -611,7 +615,13 @@ pub fn upload_to_folder( ) -> DaftResult { fn _upload_bytes_to_folder( folder_path: &str, - bytes_iter: impl Iterator>>, + // TODO: We can further optimize this for larger rows by using instead an Iterator + // This would allow us to iteratively copy smaller chunks of data and feed it to the AWS SDKs, instead + // of materializing the entire row at once as a single bytes::Bytes. + // + // Alternatively, we can find a way of creating a `bytes::Bytes` that just references the underlying + // arrow2 buffer, without making a copy. This would be the ideal case. + bytes_iter: impl Iterator>, max_connections: usize, multi_thread: bool, config: Arc, @@ -678,7 +688,7 @@ pub fn upload_to_folder( .unwrap() .as_arrow() .iter() - .map(|bytes_slice| bytes_slice.map(|b| b.to_vec())); + .map(|bytes_slice| bytes_slice.map(|b| bytes::Bytes::from(b.to_vec()))); _upload_bytes_to_folder( folder_path, bytes_iter, @@ -694,7 +704,7 @@ pub fn upload_to_folder( .unwrap() .as_arrow() .iter() - .map(|bytes_slice| bytes_slice.map(|b| b.to_vec())); + .map(|bytes_slice| bytes_slice.map(|b| bytes::Bytes::from(b.to_vec()))); _upload_bytes_to_folder( folder_path, bytes_iter, @@ -705,12 +715,10 @@ pub fn upload_to_folder( ) } DataType::Utf8 => { - let bytes_iter = series - .utf8() - .unwrap() - .as_arrow() - .iter() - .map(|utf8_slice| utf8_slice.map(|s| s.as_bytes().to_vec())); + let bytes_iter = + series.utf8().unwrap().as_arrow().iter().map(|utf8_slice| { + utf8_slice.map(|s| bytes::Bytes::from(s.as_bytes().to_vec())) + }); _upload_bytes_to_folder( folder_path, bytes_iter, diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index ff7b0b9185..8963f05ab3 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -150,7 +150,7 @@ impl ObjectSource for LocalSource { async fn put( &self, uri: &str, - data: Vec, + data: bytes::Bytes, _io_stats: Option, ) -> super::Result<()> { const LOCAL_PROTOCOL: &str = "file://"; diff --git a/src/daft-io/src/object_io.rs b/src/daft-io/src/object_io.rs index 1e3f4764cb..295c10cfab 100644 --- a/src/daft-io/src/object_io.rs +++ b/src/daft-io/src/object_io.rs @@ -182,7 +182,7 @@ pub(crate) trait ObjectSource: Sync + Send { async fn put( &self, uri: &str, - data: Vec, + data: bytes::Bytes, io_stats: Option, ) -> super::Result<()>; diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index ef7a7cb836..9140efd7aa 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -933,7 +933,7 @@ impl S3LikeSource { &self, _permit: OwnedSemaphorePermit, uri: &str, - data: Vec, + data: bytes::Bytes, region: &Region, ) -> super::Result<()> { log::debug!( @@ -1014,7 +1014,7 @@ impl ObjectSource for S3LikeSource { async fn put( &self, uri: &str, - data: Vec, + data: bytes::Bytes, io_stats: Option, ) -> super::Result<()> { let data_len = data.len();