Skip to content

Commit

Permalink
Minor cleanup to use Bytes instead of Vec<u8>
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Jul 3, 2024
1 parent 83178b1 commit bf7f204
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl ObjectSource for AzureBlobSource {
async fn put(
&self,
_uri: &str,
_data: Vec<u8>,
_data: bytes::Bytes,
_io_stats: Option<IOStatsRef>,
) -> super::Result<()> {
todo!();

Check warning on line 538 in src/daft-io/src/azure_blob.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-io/src/azure_blob.rs#L532-L538

Added lines #L532 - L538 were not covered by tests
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl ObjectSource for GCSSource {
async fn put(
&self,
_uri: &str,
_data: Vec<u8>,
_data: bytes::Bytes,
_io_stats: Option<IOStatsRef>,
) -> super::Result<()> {
todo!();
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl ObjectSource for HttpSource {
async fn put(
&self,
_uri: &str,
_data: Vec<u8>,
_data: bytes::Bytes,
_io_stats: Option<IOStatsRef>,
) -> super::Result<()> {
todo!();
Expand Down
30 changes: 19 additions & 11 deletions src/daft-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl IOClient {
pub async fn single_url_put(
&self,
dest: &str,
data: Vec<u8>,
data: bytes::Bytes,
io_stats: Option<IOStatsRef>,
) -> Result<()> {
let (scheme, dest) = parse_url(dest)?;
Expand Down Expand Up @@ -313,7 +313,7 @@ impl IOClient {
&self,
index: usize,
dest: String,
data: Option<Vec<u8>>,
data: Option<bytes::Bytes>,
io_stats: Option<IOStatsRef>,
) -> Result<Option<String>> {
let value = if let Some(data) = data {
Expand Down Expand Up @@ -601,6 +601,10 @@ pub fn url_download(
}
}

/// Uploads data from a Binary/FixedSizeBinary/Utf8 Series to the provided folder_path
///
/// This performs an async upload of each row, and creates in-memory copies of the data that is currently in-flight.
/// Memory consumption should be tunable by configuring `max_connections`, which tunes the number of in-flight tokio tasks.
pub fn upload_to_folder(
series: &Series,
folder_path: &str,
Expand All @@ -611,7 +615,13 @@ pub fn upload_to_folder(
) -> DaftResult<Series> {
fn _upload_bytes_to_folder(
folder_path: &str,
bytes_iter: impl Iterator<Item = Option<Vec<u8>>>,
// TODO: We can further optimize this for larger rows by using instead an Iterator<Item = bytes::Bytes>
// This would allow us to iteratively copy smaller chunks of data and feed it to the AWS SDKs, instead
// of materializing the entire row at once as a single bytes::Bytes.
//
// Alternatively, we can find a way of creating a `bytes::Bytes` that just references the underlying
// arrow2 buffer, without making a copy. This would be the ideal case.
bytes_iter: impl Iterator<Item = Option<bytes::Bytes>>,
max_connections: usize,
multi_thread: bool,
config: Arc<IOConfig>,
Expand Down Expand Up @@ -678,7 +688,7 @@ pub fn upload_to_folder(
.unwrap()
.as_arrow()
.iter()
.map(|bytes_slice| bytes_slice.map(|b| b.to_vec()));
.map(|bytes_slice| bytes_slice.map(|b| bytes::Bytes::from(b.to_vec())));
_upload_bytes_to_folder(
folder_path,
bytes_iter,
Expand All @@ -694,7 +704,7 @@ pub fn upload_to_folder(
.unwrap()
.as_arrow()
.iter()
.map(|bytes_slice| bytes_slice.map(|b| b.to_vec()));
.map(|bytes_slice| bytes_slice.map(|b| bytes::Bytes::from(b.to_vec())));
_upload_bytes_to_folder(
folder_path,
bytes_iter,
Expand All @@ -705,12 +715,10 @@ pub fn upload_to_folder(
)

Check warning on line 715 in src/daft-io/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-io/src/lib.rs#L702-L715

Added lines #L702 - L715 were not covered by tests
}
DataType::Utf8 => {
let bytes_iter = series
.utf8()
.unwrap()
.as_arrow()
.iter()
.map(|utf8_slice| utf8_slice.map(|s| s.as_bytes().to_vec()));
let bytes_iter =
series.utf8().unwrap().as_arrow().iter().map(|utf8_slice| {
utf8_slice.map(|s| bytes::Bytes::from(s.as_bytes().to_vec()))
});
_upload_bytes_to_folder(
folder_path,
bytes_iter,
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl ObjectSource for LocalSource {
async fn put(
&self,
uri: &str,
data: Vec<u8>,
data: bytes::Bytes,
_io_stats: Option<IOStatsRef>,
) -> super::Result<()> {
const LOCAL_PROTOCOL: &str = "file://";
Expand Down
2 changes: 1 addition & 1 deletion src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub(crate) trait ObjectSource: Sync + Send {
async fn put(
&self,
uri: &str,
data: Vec<u8>,
data: bytes::Bytes,
io_stats: Option<IOStatsRef>,
) -> super::Result<()>;

Expand Down
4 changes: 2 additions & 2 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ impl S3LikeSource {
&self,
_permit: OwnedSemaphorePermit,
uri: &str,
data: Vec<u8>,
data: bytes::Bytes,
region: &Region,
) -> super::Result<()> {
log::debug!(
Expand Down Expand Up @@ -1014,7 +1014,7 @@ impl ObjectSource for S3LikeSource {
async fn put(
&self,
uri: &str,
data: Vec<u8>,
data: bytes::Bytes,
io_stats: Option<IOStatsRef>,
) -> super::Result<()> {
let data_len = data.len();
Expand Down

0 comments on commit bf7f204

Please sign in to comment.