From 56f686a3bf73b980bbc4ab56830d9ad97ccfb5c1 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 6 Dec 2024 13:58:05 +0800 Subject: [PATCH 1/2] avoid async_trait for FileRead and provide object safe dyn methods --- crates/iceberg/src/io/file_io.rs | 69 ++++++++++++++++++++++++++------ 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 8365d622c..9c8c83f31 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashMap; +use std::future::Future; use std::ops::Range; use std::sync::Arc; @@ -62,8 +63,8 @@ impl FileIO { ErrorKind::DataInvalid, "Input is neither a valid url nor path", ) - .with_context("input", path.as_ref().to_string()) - .with_source(e) + .with_context("input", path.as_ref().to_string()) + .with_source(e) }) })?; @@ -178,7 +179,7 @@ impl FileIOBuilder { /// Add argument for operator. pub fn with_props( mut self, - args: impl IntoIterator, + args: impl IntoIterator, ) -> Self { self.props .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string()))); @@ -203,20 +204,64 @@ pub struct FileMetadata { } /// Trait for reading file. -/// -/// # TODO -/// -/// It's possible for us to remove the async_trait, but we need to figure -/// out how to handle the object safety. -#[async_trait::async_trait] -pub trait FileRead: Send + Unpin + 'static { +pub trait FileRead: Send + Sync + 'static { /// Read file content with given range. /// /// TODO: we can support reading non-contiguous bytes in the future. - async fn read(&self, range: Range) -> crate::Result; + fn read(&self, range: Range) -> impl Future> + Send + '_; } -#[async_trait::async_trait] +mod dyn_trait { + use std::ops::{Deref, Range}; + use std::sync::Arc; + use bytes::Bytes; + use crate::io::FileRead; + use super::Result; + #[async_trait::async_trait] + /// `FileRead` with object safety + pub trait DynFileRead: Send + Sync + 'static { + /// `read` of `FileRead` + async fn read(&self, range: Range) -> Result; + } + + #[async_trait::async_trait] + impl DynFileRead for R { + async fn read(&self, range: Range) -> Result { + self.read(range).await + } + } + + trait DynFileReadPointer: Deref + Send + Sync + 'static {} + + impl FileRead for P { + async fn read(&self, range: Range) -> Result { + (**self).read(range).await + } + } + + impl DynFileReadPointer for Box {} + impl DynFileReadPointer for Arc {} + + /// Provides extra methods for `FileRead` + pub trait FileReadDynExt: FileRead + Sized { + /// Create a type erased `FileRead` wrapped with `Box` + fn boxed(self) -> Box { + assert_file_read(Box::new(self) as _) + } + + /// Create a type erased `FileRead` wrapped with `Arc` + fn arc(self) -> Arc { + assert_file_read(Arc::new(self) as _) + } + } + + fn assert_file_read(read: R) -> R { + read + } +} + +pub use dyn_trait::{DynFileRead, FileReadDynExt}; + impl FileRead for opendal::Reader { async fn read(&self, range: Range) -> crate::Result { Ok(opendal::Reader::read(self, range).await?.to_bytes()) From 3e443108f91938257d86c88a39b7b7560a28ecce Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 6 Dec 2024 14:25:47 +0800 Subject: [PATCH 2/2] impl trait --- crates/iceberg/src/io/file_io.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 9c8c83f31..5f46595fd 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -63,8 +63,8 @@ impl FileIO { ErrorKind::DataInvalid, "Input is neither a valid url nor path", ) - .with_context("input", path.as_ref().to_string()) - .with_source(e) + .with_context("input", path.as_ref().to_string()) + .with_source(e) }) })?; @@ -179,7 +179,7 @@ impl FileIOBuilder { /// Add argument for operator. pub fn with_props( mut self, - args: impl IntoIterator, + args: impl IntoIterator, ) -> Self { self.props .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string()))); @@ -208,15 +208,17 @@ pub trait FileRead: Send + Sync + 'static { /// Read file content with given range. /// /// TODO: we can support reading non-contiguous bytes in the future. - fn read(&self, range: Range) -> impl Future> + Send + '_; + fn read(&self, range: Range) -> impl Future> + Send + '_; } mod dyn_trait { use std::ops::{Deref, Range}; use std::sync::Arc; + use bytes::Bytes; - use crate::io::FileRead; + use super::Result; + use crate::io::FileRead; #[async_trait::async_trait] /// `FileRead` with object safety pub trait DynFileRead: Send + Sync + 'static { @@ -231,7 +233,7 @@ mod dyn_trait { } } - trait DynFileReadPointer: Deref + Send + Sync + 'static {} + trait DynFileReadPointer: Deref + Send + Sync + 'static {} impl FileRead for P { async fn read(&self, range: Range) -> Result { @@ -258,6 +260,8 @@ mod dyn_trait { fn assert_file_read(read: R) -> R { read } + + impl FileReadDynExt for R {} } pub use dyn_trait::{DynFileRead, FileReadDynExt};